00001
00006 #include "Client_IO.H"
00007
00008
00009
00010 const int ConnectRetries = 3;
00011 const int RetryDelay = 60;
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 string CClientPolynomFetcher::buffer;
00024
00025 void * CClientPolynomFetcher::THREAD_fetch_polynom(void *)
00026 {
00027 static unsigned int PolyInterval = 1000;
00028 static time_t prev_time = 0;
00029 if (PolyInterval>500 && time(NULL)>prev_time+300) PolyInterval=100*(PolyInterval/200);
00030 else if (time(NULL)<prev_time+120) PolyInterval+=100*(PolyInterval/200);
00031 prev_time=time(NULL);
00032
00033 int retries_on_err = 0;
00034 try_again:
00035 try
00036 {
00037 unix_io_stream tcp_ServerConnection(communication_name, server_port);
00038 cout_network << "Fetching mpqs polynomial interval (" << PolyInterval << ")..." << endl;
00039 tcp_ServerConnection << "Polynom? " << PolyInterval << endl;
00040 buffer.clear();
00041 while (!tcp_ServerConnection.eof())
00042 {
00043 char c;
00044 tcp_ServerConnection.get(c);
00045 buffer+=c;
00046 }
00047 cout_network << "background thread: polynomial fetched." << endl;
00048 return NULL;
00049 }
00050 catch (unix_buffer_exception &e)
00051 {
00052 cerr << "caught exception in background thread while fetching polynomial:" << endl
00053 << e.what() << endl;
00054 if (prev_time!=0 && ++retries_on_err<=ConnectRetries) { sleep(RetryDelay); goto try_again; }
00055 else throw;
00056 }
00057 catch (...)
00058 {
00059 cerr << "caught unknown exception in background thread while fetching polynomial." << endl;
00060 throw;
00061 }
00062 }
00063
00064 void CClientPolynomFetcher::fetch(mpz_t UpperBound_D)
00065 {
00066 static pthread_t thread_fetch_polynom;
00067 int retcode;
00068
00069 static bool first_time_first_love = true;
00070 if (first_time_first_love)
00071 {
00072 first_time_first_love=false;
00073 retcode = pthread_create(&thread_fetch_polynom, NULL, THREAD_fetch_polynom, NULL);
00074 if (retcode != 0)
00075 {
00076 cerr << "CClientPolynomFetcher: pthread_create failed!" << endl;
00077 exit(1);
00078 }
00079 }
00080
00081
00082 cout_network << "Polynomfetch join..." << endl;
00083 retcode = pthread_join(thread_fetch_polynom, NULL);
00084 if (retcode != 0)
00085 {
00086 cerr << "CClientPolynomFetcher: Joining thread failed!" << endl;
00087 exit(1);
00088 }
00089
00090 cout << "Setting mpqs polynomial interval..." << endl;
00091
00092
00093 istringstream is(buffer);
00094 Polynom.load(is);
00095 cout << "Old UpperBound_D = " << UpperBound_D << endl;
00096 is >> UpperBound_D;
00097 cout << "New UpperBound_D = " << UpperBound_D << endl;
00098
00099
00100 retcode = pthread_create(&thread_fetch_polynom, NULL, THREAD_fetch_polynom, NULL);
00101 if (retcode != 0)
00102 {
00103 cerr << "CClientPolynomFetcher::pthread_create failed!" << endl;
00104 exit(1);
00105 }
00106 }
00107
00108
00109
00110
00111
00112 CMutex CClientDynamicFactorFetcher::Mutex;
00113 queue<int> CClientDynamicFactorFetcher::buffer;
00114
00115 void * CClientDynamicFactorFetcher::THREAD_fetch_DynamicFactors(void *)
00116 {
00117 int sleep_secs = 100;
00118 while(true)
00119 {
00120 try
00121 {
00122 unix_io_stream tcp_ServerConnection(communication_name, server_port);
00123 cout << "Fetching dynamic factors from server..." << endl;
00124 static unsigned int dynfac_pos = 0;
00125 tcp_ServerConnection << "DynamicFactors?_ab_index " << dynfac_pos << endl;
00126
00127 int factor = 0;
00128 int counter = 0;
00129 while (true)
00130 {
00131
00132
00133 char c[4];
00134 if (tcp_ServerConnection.peek()==EOF)
00135 {
00136
00137
00138
00139
00140
00141 if ( Cpoll(tcp_ServerConnection).readable_chars_within(128,2000) < 4)
00142 cerr << "possibly failing..." << endl;
00143 tcp_ServerConnection.clear();
00144 }
00145 tcp_ServerConnection.read(c,4);
00146 factor=static_cast<unsigned int>(static_cast<unsigned char>(c[0]));
00147 factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[1]))<<8;
00148 factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[2]))<<16;
00149 factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[3]))<<24;
00150 if (factor<=0 || tcp_ServerConnection.fail())
00151 {
00152 MARK; break;
00153 }
00154 Mutex.lock(); buffer.push(factor); Mutex.unlock();
00155 ++counter;
00156 }
00157 dynfac_pos+=counter;
00158 cout_network << "background thread: " << counter << " dynamic factors fetched." << endl;
00159 if (counter>200) sleep_secs-=sleep_secs/4;
00160 else if (counter<20 && sleep_secs<3600) sleep_secs+=(sleep_secs+3)/4;
00161 }
00162 catch (unix_buffer_exception &e)
00163 {
00164 cerr << "caught exception in background thread while fetching dynamic factors:" << endl
00165 << e.what() << endl;
00166 sleep_secs=90;
00167 }
00168 catch (...)
00169 {
00170 cerr << "caught unknown exception in background thread while fetching dynamic factors." << endl;
00171 throw;
00172 }
00173
00174 cout_network << "next dynamic factor fetch request in " << sleep_secs << " seconds." << endl;
00175 sleep(sleep_secs);
00176 }
00177 return NULL;
00178 }
00179
00180
00181 void CClientDynamicFactorFetcher::fetch()
00182 {
00183 static pthread_t thread_fetch_DynamicFactors;
00184
00185 int retcode;
00186 static bool first_time_first_love = true;
00187
00188 if (first_time_first_love)
00189 {
00190 first_time_first_love=false;
00191 retcode = pthread_create(&thread_fetch_DynamicFactors, NULL, THREAD_fetch_DynamicFactors, NULL);
00192 if (retcode != 0)
00193 {
00194 cerr << "CClientDynamicFactorFetcher: pthread_create failed!" << endl;
00195 exit(1);
00196 }
00197 }
00198
00199 CUnlockMutexAtDestruction Unlocker(Mutex); Mutex.lock();
00200
00201
00202 if (buffer.empty()) return;
00203
00204 #ifdef VERBOSE
00205 cout << "Inserting " << buffer.size() << " dynamic factors..." << endl;
00206 #endif
00207
00208
00209 TDynamicFactorRelation relation;
00210 relation.fpos=0; relation.factor=0;
00211 while (!buffer.empty())
00212 {
00213 relation.factor=buffer.front(); buffer.pop();
00214
00215 relation.append_for_sieving();
00216 DynamicFactorRelations.insert(relation);
00217 }
00218 }
00219
00220
00221
00222
00223
00224 void CClientRelation_Delivery::init(void)
00225 {
00226
00227
00228
00229 #ifdef CYGWIN_COMPAT
00230 struct hostent *hp;
00231 hp = gethostbyname(communication_name.c_str());
00232 if (hp == NULL)
00233 {
00234 cerr << "Unknown host " << communication_name << endl;
00235 exit (1);
00236 }
00237 #else
00238
00239 struct addrinfo hints;
00240 memset(&hints,0,sizeof(hints));
00241 hints.ai_family=PF_INET;
00242 hints.ai_socktype=SOCK_STREAM;
00243 struct addrinfo *addrinfo_res = NULL;
00244 const int retval = getaddrinfo(communication_name.c_str(),NULL,&hints,&addrinfo_res);
00245 if ( retval || addrinfo_res==NULL )
00246 {
00247 cerr << "can't reach " << "\"" << communication_name << "\"" << endl;
00248 cerr << "Error given by getaddrinfo: " << endl;
00249 cerr << gai_strerror(retval) << endl;
00250 exit(1);
00251 }
00252 if (addrinfo_res->ai_socktype!=SOCK_STREAM)
00253 {
00254 cerr << "provided protocol doesn't support SOCK_STREAM" << endl;
00255 exit(1);
00256 }
00257
00258 freeaddrinfo(addrinfo_res);
00259 #endif
00260
00261
00262
00263 pthread_attr_t detached_thread;
00264 pthread_t thread_transmit_relations;
00265
00266 pthread_attr_init(&detached_thread);
00267 pthread_attr_setdetachstate(&detached_thread, PTHREAD_CREATE_DETACHED);
00268
00269 const int retcode = pthread_create(&thread_transmit_relations, &detached_thread,
00270 CClientRelation_Delivery::THREAD_transmit_Relations, NULL);
00271
00272 if (retcode != 0)
00273 {
00274 cerr << "pthread_create failed for THREAD_transmit_Relations!" << endl;
00275 exit(1);
00276 }
00277
00278 }
00279
00280
00281
00282 void * CClientRelation_Delivery::THREAD_transmit_Relations(void *)
00283 {
00284
00285 string s;
00286 char line[1024];
00287
00288
00289
00290
00291
00292
00293 unix_io_stream *connection_to_server = NULL;
00294 int err_count = 0;
00295 static bool connection_reachable = false;
00296 bool retried_transmission = false;
00297 ostringstream *temp_relations = NULL;
00298
00299 reconnect:
00300 try
00301 {
00302 if (connection_to_server!=NULL) delete connection_to_server;
00303
00304
00305 PipeInput.get(); PipeInput.unget();
00306
00307 connection_to_server = new unix_io_stream(communication_name, server_port);
00308 connection_reachable=true;
00309 }
00310 catch (unix_buffer_exception &e)
00311 {
00312 connection_to_server=NULL;
00313 cerr << "caught an exception: " << e.what() << endl;
00314 if (connection_reachable && ++err_count<=ConnectRetries)
00315 {
00316 cout << "sleeping " << RetryDelay << " seconds before retrying..." << endl;
00317 sleep(RetryDelay);
00318 goto reconnect;
00319 }
00320 else
00321 {
00322 goto done;
00323 }
00324 }
00325
00326
00327 *connection_to_server << "NewRelations! " << kN << endl;
00328
00329 *connection_to_server >> s;
00330 if (s!="proceed")
00331 {
00332 cerr << "Oops! Server does not accept my relations: " << endl;
00333 cerr << s << endl;
00334 goto done;
00335 }
00336
00337
00338 if (ClientAccountName!="") *connection_to_server << "Account: " << ClientAccountName << endl;
00339
00340 if (retried_transmission)
00341 {
00342
00343 MARK;
00344 retried_transmission=false;
00345 cout << "retry sending relations." << endl;
00346 if (temp_relations==NULL || temp_relations->str().empty())
00347 {
00348 cout << "nothing to transmit..." << endl;
00349 }
00350 else
00351 {
00352 *connection_to_server << temp_relations->str() << flush;
00353 if (connection_to_server->fail())
00354 {
00355 MARK;
00356 cerr << "stream state indicates that transmission has failed!" << endl;
00357 retried_transmission=true;
00358 goto reconnect;
00359 }
00360
00361
00362
00363 static int counter = 0;
00364 *connection_to_server << "Relationsblock_Sync " << flush;
00365 s="(empty)"; *connection_to_server >> s;
00366 if (s!="synced.")
00367 {
00368 cerr << "sync with server failed: " << endl; cerr << s << endl;
00369 retried_transmission=true;
00370 if (++counter > 3)
00371 {
00372 cerr << "too many retries. giving up. sorry." << endl;
00373 goto done;
00374 }
00375 goto reconnect;
00376 }
00377 counter = 0;
00378 cout << "retry sending relations succeeded." << endl;
00379 }
00380 }
00381
00382 transmit:
00383 while (PipeInput.peek()!=EOF)
00384 {
00385 delete temp_relations; temp_relations = new ostringstream();
00386
00387 int count=1000+1;
00388 const int max_duration = 8;
00389 const time_t starttime = time(NULL);
00390 while (PipeInput.peek()!=EOF && --count && time(NULL)-starttime<=max_duration)
00391 {
00392 *connection_to_server << "RL ";
00393 *temp_relations << "RL ";
00394 while (PipeInput.peek()!='\n')
00395 {
00396 PipeInput.get(line,sizeof(line),'\n');
00397 *connection_to_server << line;
00398 *temp_relations << line;
00399 }
00400 char c; PipeInput.get(c);
00401 *connection_to_server << '\n';
00402 *temp_relations << '\n';
00403 }
00404
00405 *connection_to_server << flush;
00406
00407 if (connection_to_server->fail())
00408 {
00409 MARK;
00410 cerr << "stream state indicates that transmission has failed!" << endl;
00411 retried_transmission=true;
00412 goto reconnect;
00413 }
00414
00415 if (PipeInput.peek()!=EOF)
00416 {
00417
00418 *connection_to_server << "Relationsblock_Sync " << flush;
00419 s="(empty)"; *connection_to_server >> s;
00420 if (s!="synced.")
00421 {
00422 cerr << "sync with server failed: " << endl; cerr << s << endl;
00423 retried_transmission=true;
00424 goto reconnect;
00425 }
00426 }
00427 }
00428 PipeInput.clear();
00429
00430
00431 cout << "syncing with server..." << endl;
00432 *connection_to_server << "Relationsblock_Sync " << flush;
00433 s="(empty)"; *connection_to_server >> s;
00434 if (s!="synced.")
00435 {
00436 cerr << "sync with server failed: " << endl; cerr << s << endl;
00437 goto done;
00438 }
00439
00440 sleep(5);
00441 if (PipeInput.peek()!=EOF) goto transmit;
00442
00443 *connection_to_server << "Relationsblock_Ende " << flush;
00444 *connection_to_server >> s;
00445 if (s=="ignoriert!")
00446 {
00447 cerr << "Oops! - Transmitted relations were rejected by the server!" << endl;
00448 goto done;
00449 }
00450 if (s!="empfangen.")
00451 {
00452 cerr << "Transmission of relations not acknowledged!" << endl;
00453 goto done;
00454 }
00455
00456 goto reconnect;
00457
00458 done:
00459
00460 cout << "ending transmissions... bye, bye..." << endl;
00461 delete temp_relations;
00462 delete connection_to_server;
00463 exit(1);
00464 }