Base file: eMule0.26d.Maella.v2.0.beta6\src\UploadQueue.cpp
Modified file: vampirev1esrc\src\UploadQueue.cpp
//this file is part of eMule //Copyright (C)2002 Merkur ( merkur-@users.sourceforge.net / http://www.emule-project.net ) // //This program is free software; you can redistribute it and/or //modify it under the terms of the GNU General Public License //as published by the Free Software Foundation; either //version 2 of the License, or (at your option) any later version. // //This program is distributed in the hope that it will be useful, //but WITHOUT ANY WARRANTY; without even the implied warranty of //MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //GNU General Public License for more details. // //You should have received a copy of the GNU General Public License //along with this program; if not, write to the Free Software //Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. #include "StdAfx.h" #include "uploadqueue.h" #include "packets.h" #include "emule.h" #include "knownfile.h" #include "listensocket.h" #include "ini2.h" #include "math.h" #ifdef _DEBUG #undef THIS_FILE static char THIS_FILE[]=__FILE__; #define new DEBUG_NEW #endif VOID CALLBACK TimerProc(HWND hwnd, UINT uMsg,UINT_PTR idEvent,DWORD dwTime); //TODO rewrite the whole networkcode, use overlapped sockets CUploadQueue::CUploadQueue(CPreferences* in_prefs){ app_prefs = in_prefs; h_timer = SetTimer(0,141,TIMER_PERIOD,TimerProc); // Maella => -Small latency- if (!h_timer) theApp.emuledlg->AddDebugLogLine(false,GetResString(IDS_ERR_TIMERCREATEFAILED)); bannedcount = 0; successfullupcount = 0; failedupcount = 0; totaluploadtime = 0; m_nLastStartUpload = 0; // Maella => -Accurate measure of bandwidth: IP, TCP or UDP, eDonkey protocol, etc- m_nUpDataOverheadSourceExchange = 0; m_nUpDataOverheadFileRequest = 0; m_nUpDataOverheadOther = 0; m_nUpDataOverheadServer = 0; m_nUpDataOverheadSourceExchangePackets = 0; m_nUpDataOverheadFileRequestPackets = 0; m_nUpDataOverheadOtherPackets = 0; m_nUpDataOverheadServerPackets = 0; m_nUpDatarate = 0; m_nUpDatarate10s = 0; // Maella end // Maella => -Pseudo overhead datarate control-! // " m_lastProcessTime = ::GetTickCount(); m_lastSentBytes = theApp.stat_sentBytes; m_lastOverallSentBytes = theApp.stat_overallSentBytes; m_nUploadSlopeControl = 0; // Maella end } void CUploadQueue::AddUpNextClient(CUpDownClient* directadd){ POSITION toadd = 0; uint32 bestscore = 0; CUpDownClient* newclient; // select next client or use given client if (!directadd){ POSITION pos1, pos2; for (pos1 = waitinglist.GetHeadPosition();( pos2 = pos1 ) != NULL;){ waitinglist.GetNext(pos1); CUpDownClient* cur_client = waitinglist.GetAt(pos2); // clear dead clients ASSERT ( cur_client->GetLastUpRequest() ); if ((::GetTickCount() - cur_client->GetLastUpRequest() > MAX_PURGEQUEUETIME) || !theApp.sharedfiles->GetFileByID(cur_client->reqfileid) ){ RemoveFromWaitingQueue(pos2,true); if (!cur_client->socket) cur_client->Disconnected(); } // finished clearing else if ( (cur_client->GetScore(true) > bestscore) && (!cur_client->IsBanned()) && (!cur_client->HasLowID() || (cur_client->socket && cur_client->socket->IsConnected()) ) ){ bestscore = cur_client->GetScore(true); toadd = pos2; } } if (!toadd) return; newclient = waitinglist.GetAt(toadd); RemoveFromWaitingQueue(toadd, true); theApp.emuledlg->transferwnd.ShowQueueCount(waitinglist.GetCount()); } else newclient = directadd; if (IsDownloading(newclient)){ return; } // tell the client that we are now ready to upload if (!newclient->socket || !newclient->socket->IsConnected()){ newclient->SetUploadState(US_CONNECTING); newclient->TryToConnect(true); } else{ Packet* packet = new Packet(OP_ACCEPTUPLOADREQ,0); theApp.uploadqueue->AddUpDataOverheadFileRequest(packet->size); newclient->socket->SendPacket(packet,true); newclient->SetUploadState(US_UPLOADING); } newclient->SetUpStartTime(); newclient->ResetSessionUp(); uploadinglist.AddTail(newclient); // statistic CKnownFile* reqfile = theApp.sharedfiles->GetFileByID((uchar*)newclient->reqfileid); if (reqfile) reqfile->statistic.AddAccepted(); theApp.emuledlg->transferwnd.uploadlistctrl.AddClient(newclient); } // Maella => -New bandwidth control- void CUploadQueue::Process(){ // Check if new client can be added to upload queue if(AcceptNewClient() && waitinglist.GetCount()) AddUpNextClient(); // => m_lastProcessTime // // Check if client(s) uploading // if(uploadinglist.GetCount() == 0) // return; // Count number of client ready to send block uint16 clientsrdy = 0; for(POSITION pos = uploadinglist.GetHeadPosition(); pos != NULL; ){ CUpDownClient* cur_client = uploadinglist.GetNext(pos); if(cur_client->socket != NULL && cur_client->socket->IsBusy() == false && cur_client->HasBlocks() == true){ ++clientsrdy; } } bool isLimited = (app_prefs->GetMaxUpload() < UNLIMITED); // Elapsed time (TIMER_PERIOD not accurate) uint32 deltaTime = ::GetTickCount() - m_lastProcessTime; m_lastProcessTime += deltaTime; if(clientsrdy == 0){ m_nUploadSlopeControl = 0; } else if(isLimited == false){ // => to improve // Every client may receive max 1 block m_nUploadSlopeControl = clientsrdy * (MAXFRAGSIZE + 100); } else { m_nUploadSlopeControl += (app_prefs->GetMaxUpload() * 1.024f * (float)deltaTime); // [bytes/period] // Correct the slope with the bytes sent since to last processing if(theApp.glob_prefs->GetCompensateOverhead() == true){ m_nUploadSlopeControl -= (uint32)(theApp.stat_overallSentBytes - m_lastOverallSentBytes); } else { m_nUploadSlopeControl -= (uint32)(theApp.stat_sentBytes - m_lastSentBytes); } // Trunk negative value => possible when Overhead compensation activated if(m_nUploadSlopeControl < -1 * MAXFRAGSIZE){ m_nUploadSlopeControl = -1 * MAXFRAGSIZE; } } // Keep current value for next processing m_lastOverallSentBytes = theApp.stat_overallSentBytes; m_lastSentBytes = theApp.stat_sentBytes; // Try to send blocks. sint32 nUploadSlopeControl = m_nUploadSlopeControl; POSITION next_pos = uploadinglist.GetHeadPosition(); for(int i=0; i<uploadinglist.GetCount() && nUploadSlopeControl > 6 /* header size */; i++){ POSITION cur_pos = next_pos; CUpDownClient* cur_client = uploadinglist.GetNext(next_pos); // Already point to the next element // The method returns the real size of the sent block. It should be zero most of the time. // Remark: with the modified code it is not necessary to call every time this method uint32 sentBlock = cur_client->SendBlockData((uint32)nUploadSlopeControl); if(sentBlock > 0){ ASSERT(nUploadSlopeControl >= sentBlock); nUploadSlopeControl -= sentBlock; // Try to 'balance' the upload between clients. // Move the 'sender' at the end of the list. uploadinglist.AddTail(cur_client); uploadinglist.RemoveAt(cur_pos); } } }; // Maella end // Maella => -Accurate speed measurement- void CUploadQueue::CompUpDataRate(){ // Process and retrieve m_nUpDatarate form clients m_nUpDatarate = 0; for(POSITION pos = uploadinglist.GetHeadPosition(); pos != NULL; ){ CUpDownClient* cur_client = uploadinglist.GetNext(pos); cur_client->CompUpDataRate(); m_nUpDatarate += cur_client->GetUpDatarate(); // [bytes/s] } // Compute the average value of the last 10 seconds m_nUpDatarateHistory.AddTail(m_nUpDatarate); while(m_nUpDatarateHistory.GetCount() > 10){ m_nUpDatarateHistory.RemoveHead(); } for(POSITION pos = m_nUpDatarateHistory.GetHeadPosition(); pos != NULL; ){ // remark: don't keep the sum. An optimalization is not justify here m_nUpDatarate10s += m_nUpDatarateHistory.GetNext(pos); } m_nUpDatarate10s /= m_nUpDatarateHistory.GetCount(); } // Maella end // Maella => -Accurate speed measurement- bool CUploadQueue::AcceptNewClient(){ // check if we can allow a new client to start downloading form us if (::GetTickCount() - m_nLastStartUpload < 1000 ) return false; if (uploadinglist.GetCount() < MIN_UP_CLIENTS_ALLOWED) return true; else if (uploadinglist.GetCount() >= MAX_UP_CLIENTS_ALLOWED) return false; uint32 upPerClient = UPLOAD_CLIENT_DATARATE + m_nUpDatarate10s/50; if( upPerClient > 10000 ) upPerClient = 11000; //now the final check if (theApp.glob_prefs->GetMaxUpload() == UNLIMITED){ if ((uint32)uploadinglist.GetCount() < ((m_nUpDatarate10s/upPerClient)+2)) return true; } else{ uint16 nMaxSlots = 0; if (theApp.glob_prefs->GetMaxUpload() > 10){ nMaxSlots += 2; nMaxSlots += (uint16)ceil((float)((theApp.glob_prefs->GetMaxUpload() - 10)*1024) / upPerClient); } else nMaxSlots = MIN_UP_CLIENTS_ALLOWED; if ((uint32)uploadinglist.GetCount() < (m_nUpDatarate10s/UPLOAD_CHECK_CLIENT_DR) && uploadinglist.GetCount() <= nMaxSlots ) return true; } //nope return false; } // Maella end CUploadQueue::~CUploadQueue(){ KillTimer(0,141); } CUpDownClient* CUploadQueue::GetWaitingClientByIP(uint32 dwIP){ for (POSITION pos = waitinglist.GetHeadPosition();pos != 0;waitinglist.GetNext(pos)){ if (dwIP == waitinglist.GetAt(pos)->GetIP()) return waitinglist.GetAt(pos); } return 0; } void CUploadQueue::UpdateBanCount(){ int count=0; for (POSITION pos = waitinglist.GetHeadPosition();pos != 0;waitinglist.GetNext(pos)){ CUpDownClient* cur_client= waitinglist.GetAt(pos); if(cur_client->IsBanned()) count++; } SetBanCount(count); } void CUploadQueue::AddClientToQueue(CUpDownClient* client, bool bIgnoreTimelimit){ if (theApp.serverconnect->IsConnected() && theApp.serverconnect->IsLowID() && !theApp.serverconnect->IsLocalServer(client->GetServerIP(),client->GetServerPort()) && client->GetDownloadState() == DS_NONE && !client->IsFriend() && GetWaitingUserCount() > 50) return; client->AddAskedCount(); client->SetLastUpRequest(); if (!bIgnoreTimelimit){ if (client->IsBanned()){ // Maella => -AntiCrash/AntiFake handling- (Vorlost/Mortillo) if(client->IsBannedForTriedCrash() == false && ::GetTickCount() - client->GetBanTime() > 18000000){ client->UnBan(); } else return; // Maella end } client->AddRequestCount(client->reqfileid); } // check for double for (POSITION pos = waitinglist.GetHeadPosition();pos != 0;waitinglist.GetNext(pos)){ CUpDownClient* cur_client= waitinglist.GetAt(pos); if (cur_client == client){ //already on queue client->SendRankingInfo(); theApp.emuledlg->transferwnd.queuelistctrl.RefreshClient(client); return; } else if ( client->Compare(cur_client) ) { // another client with same ip or hash theApp.emuledlg->AddDebugLogLine(false,CString(GetResString(IDS_SAMEUSERHASH)),client->GetUserName(),cur_client->GetUserName(),cur_client->GetUserName() ); RemoveFromWaitingQueue(pos,true); if (!cur_client->socket) cur_client->Disconnected(); return; } } // done // Add clients server to list. if (theApp.glob_prefs->AddServersFromClient()){ in_addr host; host.S_un.S_addr = client->GetServerIP(); CServer* srv = new CServer(client->GetServerPort(), inet_ntoa(host)); srv->SetListName(srv->GetAddress()); if (!theApp.emuledlg->serverwnd.serverlistctrl.AddServer(srv, true)) delete srv; /*else theApp.emuledlg->AddLogLine(false,"Added new server: %s:%d", srv->GetFullIP(), srv->GetPort());*/ } // statistic values CKnownFile* reqfile = theApp.sharedfiles->GetFileByID((uchar*)client->reqfileid); if (reqfile) reqfile->statistic.AddRequest(); // TODO find better ways to cap the list if ((uint32)waitinglist.GetCount() > (theApp.glob_prefs->GetQueueSize()+bannedcount)) return; if (client->IsDownloading()){ // he's already downloading and wants probably only another file Packet* packet = new Packet(OP_ACCEPTUPLOADREQ,0); theApp.uploadqueue->AddUpDataOverheadFileRequest(packet->size); client->socket->SendPacket(packet,true); return; } if (waitinglist.IsEmpty() && AcceptNewClient()){ AddUpNextClient(client); m_nLastStartUpload = ::GetTickCount(); } else{ waitinglist.AddTail(client); client->SetUploadState(US_ONUPLOADQUEUE); client->SendRankingInfo(); theApp.emuledlg->transferwnd.queuelistctrl.AddClient(client); theApp.emuledlg->transferwnd.ShowQueueCount(waitinglist.GetCount()); } } bool CUploadQueue::RemoveFromUploadQueue(CUpDownClient* client, bool updatewindow){ for (POSITION pos = uploadinglist.GetHeadPosition();pos != 0;uploadinglist.GetNext(pos)){ if (client == uploadinglist.GetAt(pos)){ if (updatewindow) theApp.emuledlg->transferwnd.uploadlistctrl.RemoveClient(uploadinglist.GetAt(pos)); uploadinglist.RemoveAt(pos); if( client->GetTransferedUp() ){ successfullupcount++; totaluploadtime += client->GetUpStartTimeDelay()/1000; } else failedupcount++; client->SetUploadState(US_NONE); client->ClearUploadBlockRequests(); return true; } } return false; } uint32 CUploadQueue::GetAverageUpTime(){ if( successfullupcount ){ return totaluploadtime/successfullupcount; } return 0; } bool CUploadQueue::RemoveFromWaitingQueue(CUpDownClient* client, bool updatewindow){ POSITION pos = waitinglist.Find(client); if (pos){ RemoveFromWaitingQueue(pos,updatewindow); if (updatewindow) theApp.emuledlg->transferwnd.ShowQueueCount(waitinglist.GetCount()); return true; } else return false; } void CUploadQueue::RemoveFromWaitingQueue(POSITION pos, bool updatewindow){ CUpDownClient* todelete = waitinglist.GetAt(pos); waitinglist.RemoveAt(pos); if( todelete->IsBanned() ) todelete->UnBan(); if (updatewindow) theApp.emuledlg->transferwnd.queuelistctrl.RemoveClient(todelete); todelete->SetUploadState(US_NONE); } // Maella => -General Code Improvement- bool CUploadQueue::CheckForTimeOver(CUpDownClient* client){ if(theApp.glob_prefs->TransferFullChunks()) { if(client->GetUpStartTimeDelay() > 3600000) // Try to keep the clients from downloading for ever. return true; // For some reason, some clients can continue to download after a chunk size. // Are they redownloading the same chunk over and over???? if(client->GetSessionUp() > 10485760){ return true; } } else { // Cache current client score const uint32 score = client->GetScore(true, true); // Check if another client has a bigger score for(POSITION pos = waitinglist.GetHeadPosition(); pos != 0; ){ if(score < waitinglist.GetNext(pos)->GetScore(true, false)){ return true; } } } return false; } // Maella end void CUploadQueue::DeleteAll(){ waitinglist.RemoveAll(); uploadinglist.RemoveAll(); } // Maella => -General Code Improvement- uint16 CUploadQueue::GetWaitingPosition(CUpDownClient* client){ if(IsOnUploadQueue(client) == false){ return 0; } // Current client score const uint32 score = client->GetScore(false); // Check if another client has a bigger score uint16 rank = 1; for(POSITION pos = waitinglist.GetHeadPosition(); pos != 0; ){ if(waitinglist.GetNext(pos)->GetScore(false) > score){ rank++; } } return rank; } // Maella end // Maella => [patch] -CPU load balancing- VOID CALLBACK TimerProc(HWND hwnd, UINT uMsg, UINT_PTR idEvent, DWORD dwTime){ // Barry - Don't do anything if the app is shutting down - can cause unhandled exceptions if (!theApp.emuledlg->IsRunning()) return; theApp.uploadqueue->Process(); theApp.downloadqueue->Process(); // 1 second clock (With CPU load balancing) static uint16 counter; counter++; if(counter == (200/TIMER_PERIOD)){ theApp.downloadqueue->CompDownDataRate(); // Calcule and refresh GUI } else if(counter == (400/TIMER_PERIOD)){ theApp.uploadqueue->CompUpDataRate(); // Calcule and refresh GUI } else if(counter == (600/TIMER_PERIOD)){ theApp.emuledlg->CompDataRate(); // Calcule and refresh GUI status bar } else if(counter == (800/TIMER_PERIOD)){ theApp.emuledlg->statisticswnd.CompDataRate(); // Calcule and refresh GUI } else if (counter >= (1000/TIMER_PERIOD)){ counter=0; theApp.clientcredits->Process(); theApp.serverlist->Process(); theApp.friendlist->Process(); if(theApp.serverconnect->IsConnecting() && !theApp.serverconnect->IsSingleConnect()){ theApp.serverconnect->TryAnotherConnectionrequest(); } if(theApp.serverconnect->IsConnecting()){ theApp.serverconnect->CheckForTimeout(); } // 5 seconds static uint16 sec; sec++; if (sec>=5) { sec = 0; #ifdef _DEBUG if (!AfxCheckMemory()) AfxDebugBreak(); #endif theApp.listensocket->Process(); theApp.OnlineSig(); // Added By Bouc7 } static uint16 statsave; statsave++; if (statsave>=60) { statsave=0; CString buffer; char* fullpath = new char[strlen(theApp.glob_prefs->GetAppDir())+16]; sprintf(fullpath,"%spreferences.ini",theApp.glob_prefs->GetAppDir()); CIni ini( fullpath, "eMule" ); delete[] fullpath; fullpath=NULL; buffer.Format("%I64Lu",theApp.stat_receivedBytes+theApp.glob_prefs->GetTotalDownloaded()); ini.WriteString("TotalDownloadedBytes",buffer ,"Statistics"); buffer.Format("%I64Lu",theApp.stat_sentBytes+theApp.glob_prefs->GetTotalUploaded()); ini.WriteString("TotalUploadedBytes",buffer ,"Statistics"); } } } // Maella end CUpDownClient* CUploadQueue::GetNextClient(CUpDownClient* lastclient){ if (waitinglist.IsEmpty()) return 0; if (!lastclient) return waitinglist.GetHead(); POSITION pos = waitinglist.Find(lastclient); if (!pos){ TRACE("Error: CServerList::GetNextClient"); return waitinglist.GetHead(); } waitinglist.GetNext(pos); if (!pos) return NULL; else return waitinglist.GetAt(pos); } void CUploadQueue::FindSourcesForFileById(CTypedPtrList<CPtrList, CUpDownClient*>* srclist, uchar* filehash) { POSITION pos; pos = uploadinglist.GetHeadPosition(); while(pos) { CUpDownClient *potential = uploadinglist.GetNext(pos); if(memcmp(potential->reqfileid, filehash, 16) == 0) srclist->AddTail(potential); } pos = waitinglist.GetHeadPosition(); while(pos) { CUpDownClient *potential = waitinglist.GetNext(pos); if(memcmp(potential->reqfileid, filehash, 16) == 0) srclist->AddTail(potential); } }