//this file is part of eMule
//Copyright (C)2002 Merkur ( merkur-@users.sourceforge.net / http://www.emule-project.net )
//
//This program is free software; you can redistribute it and/or
//modify it under the terms of the GNU General Public License
//as published by the Free Software Foundation; either
//version 2 of the License, or (at your option) any later version.
//
//This program is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//GNU General Public License for more details.
//
//You should have received a copy of the GNU General Public License
//along with this program; if not, write to the Free Software
//Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#include "StdAfx.h"
#include "uploadqueue.h"
#include "packets.h"
#include "emule.h"
#include "knownfile.h"
#include "listensocket.h"
#include "ini2.h"
#include "math.h"
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
VOID CALLBACK TimerProc(HWND hwnd, UINT uMsg,UINT_PTR idEvent,DWORD dwTime);
//TODO rewrite the whole networkcode, use overlapped sockets
CUploadQueue::CUploadQueue(CPreferences* in_prefs){
app_prefs = in_prefs;
h_timer = SetTimer(0,141,TIMER_PERIOD,TimerProc); // Maella => -Small latency-
if (!h_timer)
theApp.emuledlg->AddDebugLogLine(false,GetResString(IDS_ERR_TIMERCREATEFAILED));
bannedcount = 0;
successfullupcount = 0;
failedupcount = 0;
totaluploadtime = 0;
m_nLastStartUpload = 0;
// Maella => -Accurate measure of bandwidth: IP, TCP or UDP, eDonkey protocol, etc-
m_nUpDataOverheadSourceExchange = 0;
m_nUpDataOverheadFileRequest = 0;
m_nUpDataOverheadOther = 0;
m_nUpDataOverheadServer = 0;
m_nUpDataOverheadSourceExchangePackets = 0;
m_nUpDataOverheadFileRequestPackets = 0;
m_nUpDataOverheadOtherPackets = 0;
m_nUpDataOverheadServerPackets = 0;
m_nUpDatarate = 0;
m_nUpDatarate10s = 0;
// Maella end
// Maella => -Pseudo overhead datarate control-!
// "
m_lastProcessTime = ::GetTickCount();
m_lastSentBytes = theApp.stat_sentBytes;
m_lastOverallSentBytes = theApp.stat_overallSentBytes;
m_nUploadSlopeControl = 0;
// Maella end
}
void CUploadQueue::AddUpNextClient(CUpDownClient* directadd){
POSITION toadd = 0;
uint32 bestscore = 0;
CUpDownClient* newclient;
// select next client or use given client
if (!directadd){
POSITION pos1, pos2;
for (pos1 = waitinglist.GetHeadPosition();( pos2 = pos1 ) != NULL;){
waitinglist.GetNext(pos1);
CUpDownClient* cur_client = waitinglist.GetAt(pos2);
// clear dead clients
ASSERT ( cur_client->GetLastUpRequest() );
if ((::GetTickCount() - cur_client->GetLastUpRequest() > MAX_PURGEQUEUETIME) || !theApp.sharedfiles->GetFileByID(cur_client->reqfileid) ){
RemoveFromWaitingQueue(pos2,true);
if (!cur_client->socket)
cur_client->Disconnected();
}
// finished clearing
else if ( (cur_client->GetScore(true) > bestscore) && (!cur_client->IsBanned())
&& (!cur_client->HasLowID() || (cur_client->socket && cur_client->socket->IsConnected()) ) ){
bestscore = cur_client->GetScore(true);
toadd = pos2;
}
}
if (!toadd)
return;
newclient = waitinglist.GetAt(toadd);
RemoveFromWaitingQueue(toadd, true);
theApp.emuledlg->transferwnd.ShowQueueCount(waitinglist.GetCount());
}
else
newclient = directadd;
if (IsDownloading(newclient)){
return;
}
// tell the client that we are now ready to upload
if (!newclient->socket || !newclient->socket->IsConnected()){
newclient->SetUploadState(US_CONNECTING);
newclient->TryToConnect(true);
}
else{
Packet* packet = new Packet(OP_ACCEPTUPLOADREQ,0);
theApp.uploadqueue->AddUpDataOverheadFileRequest(packet->size);
newclient->socket->SendPacket(packet,true);
newclient->SetUploadState(US_UPLOADING);
}
newclient->SetUpStartTime();
newclient->ResetSessionUp();
uploadinglist.AddTail(newclient);
// statistic
CKnownFile* reqfile = theApp.sharedfiles->GetFileByID((uchar*)newclient->reqfileid);
if (reqfile)
reqfile->statistic.AddAccepted();
theApp.emuledlg->transferwnd.uploadlistctrl.AddClient(newclient);
}
// Maella => -New bandwidth control-
void CUploadQueue::Process(){
// Check if new client can be added to upload queue
if(AcceptNewClient() && waitinglist.GetCount())
AddUpNextClient();
// => m_lastProcessTime
//
// Check if client(s) uploading
// if(uploadinglist.GetCount() == 0)
// return;
// Count number of client ready to send block
uint16 clientsrdy = 0;
for(POSITION pos = uploadinglist.GetHeadPosition(); pos != NULL; ){
CUpDownClient* cur_client = uploadinglist.GetNext(pos);
if(cur_client->socket != NULL &&
cur_client->socket->IsBusy() == false &&
cur_client->HasBlocks() == true){
++clientsrdy;
}
}
bool isLimited = (app_prefs->GetMaxUpload() < UNLIMITED);
// Elapsed time (TIMER_PERIOD not accurate)
uint32 deltaTime = ::GetTickCount() - m_lastProcessTime;
m_lastProcessTime += deltaTime;
if(clientsrdy == 0){
m_nUploadSlopeControl = 0;
}
else if(isLimited == false){
// => to improve
// Every client may receive max 1 block
m_nUploadSlopeControl = clientsrdy * (MAXFRAGSIZE + 100);
}
else {
m_nUploadSlopeControl += (app_prefs->GetMaxUpload() * 1.024f * (float)deltaTime); // [bytes/period]
// Correct the slope with the bytes sent since to last processing
if(theApp.glob_prefs->GetCompensateOverhead() == true){
m_nUploadSlopeControl -= (uint32)(theApp.stat_overallSentBytes - m_lastOverallSentBytes);
}
else {
m_nUploadSlopeControl -= (uint32)(theApp.stat_sentBytes - m_lastSentBytes);
}
// Trunk negative value => possible when Overhead compensation activated
if(m_nUploadSlopeControl < -1 * MAXFRAGSIZE){
m_nUploadSlopeControl = -1 * MAXFRAGSIZE;
}
}
// Keep current value for next processing
m_lastOverallSentBytes = theApp.stat_overallSentBytes;
m_lastSentBytes = theApp.stat_sentBytes;
// Try to send blocks.
sint32 nUploadSlopeControl = m_nUploadSlopeControl;
POSITION next_pos = uploadinglist.GetHeadPosition();
for(int i=0; i<uploadinglist.GetCount() && nUploadSlopeControl > 6 /* header size */; i++){
POSITION cur_pos = next_pos;
CUpDownClient* cur_client = uploadinglist.GetNext(next_pos); // Already point to the next element
// The method returns the real size of the sent block. It should be zero most of the time.
// Remark: with the modified code it is not necessary to call every time this method
uint32 sentBlock = cur_client->SendBlockData((uint32)nUploadSlopeControl);
if(sentBlock > 0){
ASSERT(nUploadSlopeControl >= sentBlock);
nUploadSlopeControl -= sentBlock;
// Try to 'balance' the upload between clients.
// Move the 'sender' at the end of the list.
uploadinglist.AddTail(cur_client);
uploadinglist.RemoveAt(cur_pos);
}
}
};
// Maella end
// Maella => -Accurate speed measurement-
void CUploadQueue::CompUpDataRate(){
// Process and retrieve m_nUpDatarate form clients
m_nUpDatarate = 0;
for(POSITION pos = uploadinglist.GetHeadPosition(); pos != NULL; ){
CUpDownClient* cur_client = uploadinglist.GetNext(pos);
cur_client->CompUpDataRate();
m_nUpDatarate += cur_client->GetUpDatarate(); // [bytes/s]
}
// Compute the average value of the last 10 seconds
m_nUpDatarateHistory.AddTail(m_nUpDatarate);
while(m_nUpDatarateHistory.GetCount() > 10){
m_nUpDatarateHistory.RemoveHead();
}
for(POSITION pos = m_nUpDatarateHistory.GetHeadPosition(); pos != NULL; ){
// remark: don't keep the sum. An optimalization is not justify here
m_nUpDatarate10s += m_nUpDatarateHistory.GetNext(pos);
}
m_nUpDatarate10s /= m_nUpDatarateHistory.GetCount();
}
// Maella end
// Maella => -Accurate speed measurement-
bool CUploadQueue::AcceptNewClient(){
// check if we can allow a new client to start downloading form us
if (::GetTickCount() - m_nLastStartUpload < 1000 )
return false;
if (uploadinglist.GetCount() < MIN_UP_CLIENTS_ALLOWED)
return true;
else if (uploadinglist.GetCount() >= MAX_UP_CLIENTS_ALLOWED)
return false;
uint32 upPerClient = UPLOAD_CLIENT_DATARATE + m_nUpDatarate10s/50;
if( upPerClient > 10000 )
upPerClient = 11000;
//now the final check
if (theApp.glob_prefs->GetMaxUpload() == UNLIMITED){
if ((uint32)uploadinglist.GetCount() < ((m_nUpDatarate10s/upPerClient)+2))
return true;
}
else{
uint16 nMaxSlots = 0;
if (theApp.glob_prefs->GetMaxUpload() > 10){
nMaxSlots += 2;
nMaxSlots += (uint16)ceil((float)((theApp.glob_prefs->GetMaxUpload() - 10)*1024) / upPerClient);
}
else
nMaxSlots = MIN_UP_CLIENTS_ALLOWED;
if ((uint32)uploadinglist.GetCount() < (m_nUpDatarate10s/UPLOAD_CHECK_CLIENT_DR) && uploadinglist.GetCount() <= nMaxSlots )
return true;
}
//nope
return false;
}
// Maella end
CUploadQueue::~CUploadQueue(){
KillTimer(0,141);
}
CUpDownClient* CUploadQueue::GetWaitingClientByIP(uint32 dwIP){
for (POSITION pos = waitinglist.GetHeadPosition();pos != 0;waitinglist.GetNext(pos)){
if (dwIP == waitinglist.GetAt(pos)->GetIP())
return waitinglist.GetAt(pos);
}
return 0;
}
void CUploadQueue::UpdateBanCount(){
int count=0;
for (POSITION pos = waitinglist.GetHeadPosition();pos != 0;waitinglist.GetNext(pos)){
CUpDownClient* cur_client= waitinglist.GetAt(pos);
if(cur_client->IsBanned())
count++;
}
SetBanCount(count);
}
void CUploadQueue::AddClientToQueue(CUpDownClient* client, bool bIgnoreTimelimit){
if (theApp.serverconnect->IsConnected() && theApp.serverconnect->IsLowID()
&& !theApp.serverconnect->IsLocalServer(client->GetServerIP(),client->GetServerPort())
&& client->GetDownloadState() == DS_NONE && !client->IsFriend()
&& GetWaitingUserCount() > 50)
return;
client->AddAskedCount();
client->SetLastUpRequest();
if (!bIgnoreTimelimit){
if (client->IsBanned()){
// Maella => -AntiCrash/AntiFake handling- (Vorlost/Mortillo)
if(client->IsBannedForTriedCrash() == false &&
::GetTickCount() - client->GetBanTime() > 18000000){
client->UnBan();
}
else
return;
// Maella end
}
client->AddRequestCount(client->reqfileid);
}
// check for double
for (POSITION pos = waitinglist.GetHeadPosition();pos != 0;waitinglist.GetNext(pos)){
CUpDownClient* cur_client= waitinglist.GetAt(pos);
if (cur_client == client){ //already on queue
client->SendRankingInfo();
theApp.emuledlg->transferwnd.queuelistctrl.RefreshClient(client);
return;
}
else if ( client->Compare(cur_client) ) {
// another client with same ip or hash
theApp.emuledlg->AddDebugLogLine(false,CString(GetResString(IDS_SAMEUSERHASH)),client->GetUserName(),cur_client->GetUserName(),cur_client->GetUserName() );
RemoveFromWaitingQueue(pos,true);
if (!cur_client->socket)
cur_client->Disconnected();
return;
}
}
// done
// Add clients server to list.
if (theApp.glob_prefs->AddServersFromClient()){
in_addr host;
host.S_un.S_addr = client->GetServerIP();
CServer* srv = new CServer(client->GetServerPort(), inet_ntoa(host));
srv->SetListName(srv->GetAddress());
if (!theApp.emuledlg->serverwnd.serverlistctrl.AddServer(srv, true))
delete srv;
/*else
theApp.emuledlg->AddLogLine(false,"Added new server: %s:%d", srv->GetFullIP(), srv->GetPort());*/
}
// statistic values
CKnownFile* reqfile = theApp.sharedfiles->GetFileByID((uchar*)client->reqfileid);
if (reqfile)
reqfile->statistic.AddRequest();
// TODO find better ways to cap the list
if ((uint32)waitinglist.GetCount() > (theApp.glob_prefs->GetQueueSize()+bannedcount))
return;
if (client->IsDownloading()){
// he's already downloading and wants probably only another file
Packet* packet = new Packet(OP_ACCEPTUPLOADREQ,0);
theApp.uploadqueue->AddUpDataOverheadFileRequest(packet->size);
client->socket->SendPacket(packet,true);
return;
}
if (waitinglist.IsEmpty() && AcceptNewClient()){
AddUpNextClient(client);
m_nLastStartUpload = ::GetTickCount();
}
else{
waitinglist.AddTail(client);
client->SetUploadState(US_ONUPLOADQUEUE);
client->SendRankingInfo();
theApp.emuledlg->transferwnd.queuelistctrl.AddClient(client);
theApp.emuledlg->transferwnd.ShowQueueCount(waitinglist.GetCount());
}
}
bool CUploadQueue::RemoveFromUploadQueue(CUpDownClient* client, bool updatewindow){
for (POSITION pos = uploadinglist.GetHeadPosition();pos != 0;uploadinglist.GetNext(pos)){
if (client == uploadinglist.GetAt(pos)){
if (updatewindow)
theApp.emuledlg->transferwnd.uploadlistctrl.RemoveClient(uploadinglist.GetAt(pos));
uploadinglist.RemoveAt(pos);
if( client->GetTransferedUp() ){
successfullupcount++;
totaluploadtime += client->GetUpStartTimeDelay()/1000;
}
else
failedupcount++;
client->SetUploadState(US_NONE);
client->ClearUploadBlockRequests();
return true;
}
}
return false;
}
uint32 CUploadQueue::GetAverageUpTime(){
if( successfullupcount ){
return totaluploadtime/successfullupcount;
}
return 0;
}
bool CUploadQueue::RemoveFromWaitingQueue(CUpDownClient* client, bool updatewindow){
POSITION pos = waitinglist.Find(client);
if (pos){
RemoveFromWaitingQueue(pos,updatewindow);
if (updatewindow)
theApp.emuledlg->transferwnd.ShowQueueCount(waitinglist.GetCount());
return true;
}
else
return false;
}
void CUploadQueue::RemoveFromWaitingQueue(POSITION pos, bool updatewindow){
CUpDownClient* todelete = waitinglist.GetAt(pos);
waitinglist.RemoveAt(pos);
if( todelete->IsBanned() )
todelete->UnBan();
if (updatewindow)
theApp.emuledlg->transferwnd.queuelistctrl.RemoveClient(todelete);
todelete->SetUploadState(US_NONE);
}
// Maella => -General Code Improvement-
bool CUploadQueue::CheckForTimeOver(CUpDownClient* client){
if(theApp.glob_prefs->TransferFullChunks()) {
if(client->GetUpStartTimeDelay() > 3600000) // Try to keep the clients from downloading for ever.
return true;
// For some reason, some clients can continue to download after a chunk size.
// Are they redownloading the same chunk over and over????
if(client->GetSessionUp() > 10485760){
return true;
}
}
else {
// Cache current client score
const uint32 score = client->GetScore(true, true);
// Check if another client has a bigger score
for(POSITION pos = waitinglist.GetHeadPosition(); pos != 0; ){
if(score < waitinglist.GetNext(pos)->GetScore(true, false)){
return true;
}
}
}
return false;
}
// Maella end
void CUploadQueue::DeleteAll(){
waitinglist.RemoveAll();
uploadinglist.RemoveAll();
}
// Maella => -General Code Improvement-
uint16 CUploadQueue::GetWaitingPosition(CUpDownClient* client){
if(IsOnUploadQueue(client) == false){
return 0;
}
// Current client score
const uint32 score = client->GetScore(false);
// Check if another client has a bigger score
uint16 rank = 1;
for(POSITION pos = waitinglist.GetHeadPosition(); pos != 0; ){
if(waitinglist.GetNext(pos)->GetScore(false) > score){
rank++;
}
}
return rank;
}
// Maella end
// Maella => [patch] -CPU load balancing-
VOID CALLBACK TimerProc(HWND hwnd, UINT uMsg, UINT_PTR idEvent, DWORD dwTime){
// Barry - Don't do anything if the app is shutting down - can cause unhandled exceptions
if (!theApp.emuledlg->IsRunning())
return;
theApp.uploadqueue->Process();
theApp.downloadqueue->Process();
// 1 second clock (With CPU load balancing)
static uint16 counter; counter++;
if(counter == (200/TIMER_PERIOD)){
theApp.downloadqueue->CompDownDataRate(); // Calcule and refresh GUI
}
else if(counter == (400/TIMER_PERIOD)){
theApp.uploadqueue->CompUpDataRate(); // Calcule and refresh GUI
}
else if(counter == (600/TIMER_PERIOD)){
theApp.emuledlg->CompDataRate(); // Calcule and refresh GUI status bar
}
else if(counter == (800/TIMER_PERIOD)){
theApp.emuledlg->statisticswnd.CompDataRate(); // Calcule and refresh GUI
}
else if (counter >= (1000/TIMER_PERIOD)){
counter=0;
theApp.clientcredits->Process();
theApp.serverlist->Process();
theApp.friendlist->Process();
if(theApp.serverconnect->IsConnecting() && !theApp.serverconnect->IsSingleConnect()){
theApp.serverconnect->TryAnotherConnectionrequest();
}
if(theApp.serverconnect->IsConnecting()){
theApp.serverconnect->CheckForTimeout();
}
// 5 seconds
static uint16 sec; sec++;
if (sec>=5) {
sec = 0;
#ifdef _DEBUG
if (!AfxCheckMemory()) AfxDebugBreak();
#endif
theApp.listensocket->Process();
theApp.OnlineSig(); // Added By Bouc7
}
static uint16 statsave; statsave++;
if (statsave>=60) {
statsave=0;
CString buffer;
char* fullpath = new char[strlen(theApp.glob_prefs->GetAppDir())+16];
sprintf(fullpath,"%spreferences.ini",theApp.glob_prefs->GetAppDir());
CIni ini( fullpath, "eMule" );
delete[] fullpath;
fullpath=NULL;
buffer.Format("%I64Lu",theApp.stat_receivedBytes+theApp.glob_prefs->GetTotalDownloaded());
ini.WriteString("TotalDownloadedBytes",buffer ,"Statistics");
buffer.Format("%I64Lu",theApp.stat_sentBytes+theApp.glob_prefs->GetTotalUploaded());
ini.WriteString("TotalUploadedBytes",buffer ,"Statistics");
}
}
}
// Maella end
CUpDownClient* CUploadQueue::GetNextClient(CUpDownClient* lastclient){
if (waitinglist.IsEmpty())
return 0;
if (!lastclient)
return waitinglist.GetHead();
POSITION pos = waitinglist.Find(lastclient);
if (!pos){
TRACE("Error: CServerList::GetNextClient");
return waitinglist.GetHead();
}
waitinglist.GetNext(pos);
if (!pos)
return NULL;
else
return waitinglist.GetAt(pos);
}
void CUploadQueue::FindSourcesForFileById(CTypedPtrList<CPtrList, CUpDownClient*>* srclist, uchar* filehash) {
POSITION pos;
pos = uploadinglist.GetHeadPosition();
while(pos) {
CUpDownClient *potential = uploadinglist.GetNext(pos);
if(memcmp(potential->reqfileid, filehash, 16) == 0)
srclist->AddTail(potential);
}
pos = waitinglist.GetHeadPosition();
while(pos) {
CUpDownClient *potential = waitinglist.GetNext(pos);
if(memcmp(potential->reqfileid, filehash, 16) == 0)
srclist->AddTail(potential);
}
}