unix_buffer.H

Go to the documentation of this file.
00001 #ifndef UNIX_BUFFER_
00002 #define UNIX_BUFFER_
00003 
00004 // some abstraction layers for using UNIX-network services in C++ (mainly TCP)
00005 // written by Thorsten Reinecke (reinecke@thorstenreinecke.de)
00006 // last change: 2007-06-06
00007 
00014 #ifdef CYGWIN_COMPAT
00015  #warning cygwin compatibility hacks enabled
00016 #endif
00017 
00018 extern "C"
00019 {
00020   #include <sys/socket.h>
00021   #include <sys/poll.h>
00022   #include <netinet/in.h>
00023   #include <netinet/tcp.h>
00024   #include <netdb.h>
00025   #include <unistd.h>
00026 
00027  #if defined(_REENTRANT) || defined(_THREAD_SAFE)
00028   // POSIX specifies, that (blocking) system calls are cancellation points,
00029   // but LinuxThreads may not be posix-conform to this issue,
00030   // (refer manpage PTHREAD_CANCEL(3))
00031   // so PTHREAD_TESTCANCEL is called explicitly at some important points
00032   #include <pthread.h>
00033   #define PTHREAD_TESTCANCEL pthread_testcancel(); /* explicit thread cancellation point */
00034  #else
00035   #define PTHREAD_TESTCANCEL /* this would be an explicit thread cancellation point */
00036  #endif 
00037 
00038 }
00039 
00040 
00041 // I had some problems while debugging under valgrind; this one
00042 // hacks the problems away (but maybe not the bugs...)
00043 //#define WEIRD_VALGRIND_HACK
00044 
00045 #ifdef WEIRD_VALGRIND_HACK
00046  #if !( defined(_REENTRANT) || defined(_THREAD_SAFE) )
00047   // for single-threaded versions the hack needs to be disabled
00048   #warning "weird valgrind hack disabled"
00049   #undef WEIRD_VALGRIND_HACK
00050  #endif
00051 #endif 
00052 
00053 #include <cerrno>
00054 #include <cstring>
00055 #include <string>
00056 #include <iostream>
00057 #include <sstream>
00058 #include <exception>
00059 
00060 #ifdef WEIRD_VALGRIND_HACK
00061  #warning "weird valgrind hack enabled"
00062  #include "mutex.H"
00063 #endif
00064 
00065 #if defined(CYGWIN_COMPAT)
00066  #if defined(_REENTRANT) || defined(_THREAD_SAFE)
00067   #include "mutex.H"
00068   static CMutex cygwin_mutex;
00069   #define CYGW_LOCK cygwin_mutex.lock();
00070   #define CYGW_UNLOCK cygwin_mutex.unlock();
00071  #else
00072   #define CYGW_LOCK /* cygwin_mutex.lock(); */
00073   #define CYGW_UNLOCK /* cygwin_mutex.unlock(); */
00074  #endif
00075 #endif   
00076 
00077 
00078 using std::cerr;
00079 using std::endl;
00080 using std::flush;
00081 using std::iostream;
00082 using std::streambuf;
00083 
00084 
00086 class unix_buffer_exception : public std::exception
00087 {
00088  private:
00089   const std::string Message;
00090 
00091  public:
00092   explicit unix_buffer_exception(const std::string& Msg) throw()
00093    : Message(Msg)
00094    {
00095 #if defined(VERBOSE) || defined(DEBUG)
00096      cerr << "throwing exception: " << Msg << endl;
00097 #endif
00098    }
00099  
00100   virtual ~unix_buffer_exception() throw() { }
00101   
00102   virtual const char* what() const throw()
00103    {
00104      return Message.c_str();
00105    }  
00106 };
00107 
00109 inline std::string my_strerror(const int errnum)
00110 {
00111   char buffer[1024];
00112 
00113   // please note: not the local array "buffer", but an implicitly constructed
00114   // std::string(buffer) is returned by this function!
00115 
00116 #if 0
00117   // this is POSIX conform, but does not compile on my system:
00118   // POSIX -> int strerror_r(int errnum, char *buf, size_t n);
00119   if (strerror_r(errnum,buffer,sizeof(buffer))==0) return buffer;
00120   else return "";
00121 #else
00122   // this is using the GNU extension and compiles on my system:
00123   // (see "man 3 strerror")
00124   char *ret = strerror_r(errnum,buffer,sizeof(buffer));
00125   if (ret) return ret; else return "";
00126 #endif
00127 }
00128 
00129 
00130 
00139 int open_internet_port(const int port, const int qsize = 5)
00140 {
00141   int sd;
00142   int val = 1;
00143   struct sockaddr_in server;
00144   struct linger lingval;
00145 
00146   if (port <= 0) return -1;
00147 
00148   sd = socket(AF_INET, SOCK_STREAM, 0);
00149   if (sd < 0) return sd;
00150   if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) <0)
00151     return -1;
00152 
00153   // activate linger option
00154   lingval.l_onoff=1; // if applicable, keep socket unavailable after closing
00155   lingval.l_linger=10*100; // timeout = 10 seconds
00156   if (setsockopt(sd, SOL_SOCKET, SO_LINGER, &lingval, sizeof(lingval)) <0) return -1;
00157 
00158   server.sin_family = AF_INET;
00159   server.sin_addr.s_addr = INADDR_ANY;
00160   server.sin_port = htons(port);
00161   if (bind(sd, reinterpret_cast<struct sockaddr *>(&server), sizeof(server)) < 0) return -1;
00162   listen (sd, qsize);
00163   return sd;
00164 }
00165 
00166 
00167 
00168 // utils for verbose output of meta-data to streams
00169 
00171 inline std::ostream& operator<< (std::ostream& ostr, const sockaddr &sa)
00172 {
00173 #ifdef CYGWIN_COMPAT
00174   // implement the obsolete stuff using gethostbyaddr
00175   CYGW_LOCK;
00176   u_int32_t ip = reinterpret_cast<const sockaddr_in& >(sa).sin_addr.s_addr;
00177   struct hostent *p_hostent = gethostbyaddr(reinterpret_cast<const char*>(&ip),sizeof(ip),AF_INET);
00178    // p_hostent is a static resource and therefore not threadsafe!
00179   if (p_hostent)  ostr << p_hostent->h_name;
00180   else ostr << "(unknown)";
00181   CYGW_UNLOCK;
00182 #else
00183   // outputs sockaddr info to given stream
00184   char host[1024];
00185   char service[1024];
00186   const int flags = NI_NOFQDN;
00187 // FIXME! sometimes hostname lookup segfaults on my system (when dynamically linked)
00188 // this works better: const int flags = NI_NOFQDN | NI_NUMERICHOST | NI_NUMERICSERV;
00189 // is this not threadsafe?
00190   const int retval = getnameinfo(&sa,sizeof(sa),host,sizeof(host),service,sizeof(service),flags);
00191 
00192   switch (retval)
00193   {
00194     case 0:
00195      ostr << host << " " << service;
00196      break;
00197     case EAI_AGAIN:
00198      ostr << "(The name could not be resolved at this time.)"; break;
00199     case EAI_BADFLAGS:
00200      ostr << "(The flags parameter has an invalid value.)"; break;
00201     case EAI_FAIL:
00202      ostr << "(A non-recoverable error occurred.)"; break;
00203     case EAI_FAMILY: 
00204      ostr << "(The address family was not recognized.)"; break;
00205     case EAI_MEMORY:
00206      ostr << "(Out of memory.)"; break;
00207     case EAI_NONAME:
00208      ostr << "(The  name  does  not  resolve.)"; break;
00209     case EAI_SYSTEM:
00210      ostr << "(A system error occurred.)"; break;
00211     default:
00212      ostr << "(unspecified error in getnameinfo.)"; 
00213    }
00214 #endif
00215   return ostr;
00216 }
00217 
00223 inline std::string peer_info(const int socket_descriptor)
00224 {
00225  // outputs meta-data for socket_descriptor
00226  std::ostringstream ostr;
00227 
00228  struct sockaddr sa;
00229  socklen_t sl = sizeof(sa);
00230 
00231  const int retval = getpeername(socket_descriptor, &sa, &sl);
00232  if (retval==0) ostr << sa;
00233  return ostr.str();
00234 }
00235 
00241 inline std::string socket_info(const int socket_descriptor)
00242 {
00243  // outputs meta-data for socket_descriptor
00244  std::ostringstream ostr;
00245 
00246  struct sockaddr sa;
00247  socklen_t sl = sizeof(sa);
00248 
00249  const int retval = getsockname(socket_descriptor, &sa, &sl);
00250  if (retval==0) ostr << sa;
00251  return ostr.str();
00252 }
00253 
00260 inline std::string connection_info(const int socket_descriptor)
00261 {
00262  // outputs meta-data for socket_descriptor
00263  std::ostringstream ostr;
00264 
00265  struct sockaddr sa;
00266  socklen_t sl = sizeof(sa);
00267 
00268  const int retval1 = getpeername(socket_descriptor, &sa, &sl);
00269  ostr << "connection from ";
00270  switch (retval1)
00271   {
00272     case 0       : ostr << sa; break;
00273     case EBADF   : ostr << "(invalid descriptor)"; break;
00274     case ENOTSOCK: ostr << "(file, not a socket)"; break;
00275     default : ostr << "(unable to perform getpeername)";
00276   }
00277 
00278  const int retval2 = getsockname(socket_descriptor, &sa, &sl);
00279  ostr << " to ";
00280  switch (retval2)
00281   {
00282     case 0       : ostr << sa; break;
00283     case EBADF   : ostr << "(invalid descriptor)"; break;
00284     case ENOTSOCK: ostr << "(file, not a socket)"; break;
00285     default : ostr << "(unable to perform getsockname)";
00286   }
00287  return ostr.str();
00288 }
00289 
00296 template<class T> inline std::string peer_info(const T& obj)
00297 {
00298  // outputs connection meta-data for class object to string
00299  return peer_info(obj.get_descriptor());
00300 }
00301 
00308 template<class T> inline std::string socket_info(const T& obj)
00309 {
00310  // outputs connection meta-data for class object to string
00311  return socket_info(obj.get_descriptor());
00312 }
00313 
00321 template<class T> inline std::string connection_info(const T& obj)
00322 {
00323  // outputs connection meta-data for class object to string
00324  return connection_info(obj.get_descriptor());
00325 }
00326 
00327 
00329 class Cprovide_descriptor_access
00330 {
00331  protected:
00332   virtual int _get_descriptor() const = 0;
00333  public:
00334   virtual ~Cprovide_descriptor_access() { }
00335 };
00336 
00337 
00348 class Cpoll_methods : public Cprovide_descriptor_access
00349 {
00350 public:
00351   virtual ~Cpoll_methods() {}
00352 
00358   int poll_events(short requested_events, const int timeout_ms=1) const
00359    {
00360      pollfd pfd;
00361      int res=0;
00362      do
00363       {
00364         pfd.fd=_get_descriptor(); pfd.events=requested_events; pfd.revents=0;
00365         res=poll(&pfd,1,timeout_ms);
00366         if (res==-1) if (errno!=EINTR) return -1;
00367       } while (res==-1);
00368      return pfd.revents;
00369    }
00370 
00372  inline bool bad() const
00373   {
00374     const int revents=poll_events(POLLIN);
00375     return (revents==-1) | (revents&(POLLERR|POLLHUP|POLLNVAL));
00376   }
00377 
00379  inline bool good() const
00380   {
00381     return !bad();
00382   }
00383 
00385  int readable_chars_within(const int size, const int timeout_ms=1000) const
00386   {
00387     const int revents=poll_events(POLLIN,timeout_ms);
00388     if (revents==-1) return -1;
00389     if (revents&(POLLERR|POLLHUP|POLLNVAL)) return -1;
00390     if (revents&POLLIN)
00391      {
00392        char *buf = new char[size];
00393        int count = recv(_get_descriptor(), buf, size, MSG_PEEK|MSG_NOSIGNAL);
00394        delete [] buf;
00395        return count;
00396      }
00397     return -1;
00398   }
00399 
00401  bool writable_now() const
00402   {
00403     const int revents=poll_events(POLLOUT);
00404     if (revents==-1) return false;
00405     if (revents&(POLLERR|POLLHUP|POLLNVAL)) return false;
00406     return (revents&POLLOUT);
00407   }
00408 };
00409 
00410 
00411 
00422 class Cpoll : public Cpoll_methods
00423 {
00424 private:
00425   int fd; // descriptor to poll
00426 
00427 protected:
00428   virtual int _get_descriptor() const { return fd; }  
00429 
00430 public:
00432   explicit Cpoll(const int _fd=0) : fd(_fd) {}
00433 
00435   template<class T> explicit Cpoll(const T &obj) : fd(obj.get_descriptor()) {}
00436 
00437   virtual ~Cpoll() {}
00438 
00440   void attach(const int _fd) { fd=_fd; }
00441 
00447   template<class T> void attach(const T &obj) { attach(obj.get_descriptor()); }
00448 };
00449 
00450 
00451 
00453 class Cnetwork_connection_methods : public Cprovide_descriptor_access
00454 {
00455  public:
00456   virtual ~Cnetwork_connection_methods() { }
00457 
00467   inline void set_RCVBUF(const int size)
00468    {
00469      const int fd = _get_descriptor();
00470      int val = size; // new buffersize
00471      if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val))==-1)
00472        throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_RCVBUF(): "+my_strerror(errno)));
00473    }
00474 
00482   inline void set_SNDBUF(const int size)
00483    {
00484      const int fd = _get_descriptor();
00485      int val;
00486      val=size; // new buffersize
00487      if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val))==-1)
00488        throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_SNDBUF(): "+my_strerror(errno)));
00489    }
00490 
00492   inline int get_RCVBUF()
00493    {
00494      const int fd = _get_descriptor();
00495      int val = -1;
00496      socklen_t optlen = sizeof(val);
00497      if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, &optlen)==-1)
00498       {
00499         perror("Cnetwork_connection_methods::get_RCVBUF()");
00500         return -1;
00501       }
00502      else return val;
00503    }
00504 
00506   inline int get_SNDBUF()
00507    {
00508      const int fd = _get_descriptor();
00509      int val = -1;
00510      socklen_t optlen = sizeof(val);
00511      if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, &optlen)==-1)
00512       {
00513         perror("Cnetwork_connection_methods::get_SNDBUF()");
00514         return -1;
00515       }
00516      else return val;
00517    }
00518 
00520   inline void set_RCV_timeout(const int secs, const int u_secs = 0)
00521    {
00522      const int fd = _get_descriptor();
00523      timeval tv;
00524      tv.tv_sec = secs; // secs timeout
00525      tv.tv_usec = u_secs; // micro-secs timeout
00526      if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))==-1)
00527        throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_RCV_timeout(): "+my_strerror(errno)));
00528    }
00529 
00531   inline void set_SND_timeout(const int secs, const int u_secs = 0)
00532    {
00533      const int fd = _get_descriptor();
00534      timeval tv;
00535      tv.tv_sec = secs; // secs timeout
00536      tv.tv_usec = u_secs; // micro-secs timeout
00537      if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))==-1)
00538        throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_SND_timeout(): "+my_strerror(errno)));
00539    }
00540 
00542   inline int get_RCV_timeout()
00543    {
00544      const int fd = _get_descriptor();
00545      timeval tv;
00546      socklen_t optlen = sizeof(tv);
00547      if (getsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, &optlen)==-1)
00548       {
00549         perror("Cnetwork_connection_methods::get_RCV_timeout()");
00550         return -1;
00551       }
00552      else return tv.tv_sec;
00553    }
00554 
00556   inline int get_SND_timeout()
00557    {
00558      const int fd = _get_descriptor();
00559      timeval tv;
00560      socklen_t optlen = sizeof(tv);
00561      if (getsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, &optlen)==-1)
00562       {
00563         perror("Cnetwork_connection_methods::get_SND_timeout()");
00564         return -1;
00565       }
00566      else return tv.tv_sec;
00567    }
00568 
00570   inline void set_timeout(const int secs, const int u_secs = 0)
00571    {
00572      const int fd = _get_descriptor();
00573      timeval tv;
00574      tv.tv_sec = secs; // secs timeout
00575      tv.tv_usec = u_secs; // micro-secs timeout
00576      if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))==-1)
00577        throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_timeout(), RCVTIMEO: "+my_strerror(errno)));
00578      if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))==-1)
00579        throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_timeout(), SNDTIMEO: "+my_strerror(errno)));
00580    }
00581 
00583   inline void set_TCP_NODELAY(const bool flag = true)
00584    {
00585      const int fd = _get_descriptor();
00586      int optval=flag;
00587      if (setsockopt(fd, SOL_TCP, TCP_NODELAY, &optval, sizeof(optval))==-1)
00588       throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_TCP_NODELAY(): "+my_strerror(errno)));
00589    }
00590 
00592   inline bool get_TCP_NODELAY()
00593    {
00594      const int fd = _get_descriptor();
00595      int optval;
00596      socklen_t optlen = sizeof(optval);
00597      if (getsockopt(fd, SOL_TCP, TCP_NODELAY, &optval, &optlen)==-1)
00598       perror("Cnetwork_connection_methods::get_TCP_NODELAY");
00599      return optval;
00600    }
00601 
00603   inline void set_LINGER(const signed int seconds)
00604    {
00605      // seconds<0  : turn linger off
00606      // seconds>=0 : turn linger on with specified value seconds
00607      const int fd = _get_descriptor();
00608      struct linger lingval;
00609      if (seconds<0) { lingval.l_onoff=0; lingval.l_linger=0; }
00610      else { lingval.l_onoff=1; lingval.l_linger=seconds; }
00611      if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &lingval, sizeof(lingval))==-1)
00612       throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_LINGER(): "+my_strerror(errno)));
00613    }
00614 
00616   inline signed int get_LINGER()
00617    {
00618      // returns linger seconds or -1 for linger off or -2 for error
00619      const int fd = _get_descriptor();
00620      struct linger lingval;
00621      socklen_t optlen = sizeof(lingval);
00622      if (getsockopt(fd, SOL_SOCKET, SO_LINGER, &lingval, &optlen)==-1)
00623       {
00624         perror("Cnetwork_connection_methods::get_LINGER()");
00625         return -2;
00626       }
00627      if (lingval.l_onoff==0) return -1;
00628      else return lingval.l_linger;
00629    }
00630 
00635   inline const std::string peer_info()
00636   {
00637    // outputs connection meta-data for class object to string
00638    return ::peer_info(_get_descriptor());
00639   }
00640 
00645   inline const std::string socket_info()
00646   {
00647    // outputs connection meta-data for class object to string
00648    return ::socket_info(_get_descriptor());
00649   }
00650 
00656   inline const std::string connection_info()
00657   {
00658    // outputs connection meta-data for class object to string
00659    return ::connection_info(_get_descriptor());
00660   }
00661 
00662 };
00663 
00664 
00665 
00666 
00671 class connection_waiter : public Cnetwork_connection_methods
00672 {
00673  protected:
00674   int server_socket;
00675   const int client_default_timeout;
00676 
00677   virtual int _get_descriptor() const { return server_socket; } 
00678  public:
00679 
00687   explicit connection_waiter(const int server_port, int server_port_tries = 10, const int _client_default_timeout = 75)
00688    : server_socket(-1), client_default_timeout(_client_default_timeout)
00689    {
00690      while ((server_socket = open_internet_port(server_port,10))<0
00691             && --server_port_tries>=0) sleep(1);
00692      if (server_socket<0)
00693       {
00694         cerr << "connection_waiter::constructor: cannot open serverport " << server_port << endl;
00695         throw unix_buffer_exception("connection_waiter::constructor: cannot open serverport");
00696       }
00697    }
00698 
00700   ~connection_waiter()
00701    {
00702      if (shutdown(server_socket, SHUT_RDWR)==-1) perror("~connection_waiter():shutdown");
00703      close(server_socket);
00704    }
00705 
00710   int get_new_client_socket_descriptor() const
00711    {
00712      int sd;
00713      do
00714       {
00715         sd=accept(server_socket, NULL, NULL);
00716         PTHREAD_TESTCANCEL;
00717       } while (sd==0);
00718      return sd;
00719    }
00720 
00725   int get_client_default_timeout() const
00726    {
00727      return client_default_timeout;
00728    }
00729 };
00730 
00731 
00733 class unix_fd_buffer : public streambuf, public Cprovide_descriptor_access
00734 {
00735 protected:
00736   static const int buffersize = 4096;
00737   static const int pushbacksize = 10;
00738   int fd; // -1 will explicitly denote an invalid descriptor
00739   bool fd_autoclose; // whether a valid descriptor shall be closed at destructor call
00740   char obuffer[buffersize];
00741   char ibuffer[buffersize+pushbacksize];
00742 
00743   virtual int _get_descriptor() const { return fd; } 
00744 
00745   virtual int empty_buffer()
00746    {
00747      if (fd<0) // invalid file descriptor?
00748       {
00749         cerr << "unix_fd_buffer::empty buffer(): invalid descriptor! fd=" << fd << endl;
00750         throw unix_buffer_exception("unix_fd_buffer::empty buffer(): invalid descriptor!");
00751         return EOF;
00752       }
00753      const int total_to_write = pptr()-pbase();
00754      if (!total_to_write) return 0;
00755      if (total_to_write<0 || total_to_write>buffersize)
00756       {
00757         throw unix_buffer_exception("unix_fd_buffer::empty_buffer() sanity check failed!");
00758       }
00759      int total_written = 0;
00760      signed int retries = 2; // maximum of retries in case of temporary errors
00761      do
00762      {
00763        PTHREAD_TESTCANCEL;
00764        const int count = total_to_write-total_written;
00765        const int written=write(fd, obuffer, count);
00766        PTHREAD_TESTCANCEL;
00767        if (written!=count)
00768         {
00769           cerr << "unix_fd_buffer::empty_buffer(): "
00770                << written << " bytes of " << count << " sent." << endl;
00771           if (written<0)
00772            {
00773              --retries;
00774              const int err=errno;
00775              perror("unix_fd_buffer::empty_buffer()");
00776              if (written==-1 && err == EAGAIN && retries>=0) continue; // this may fix some problems...
00777              if (written==-1 && err == EINTR && retries>=0) continue; // this may fix some problems...
00778              throw unix_buffer_exception(std::string("unix_fd_buffer::empty_buffer() send failed: ")+my_strerror(err));
00779              return EOF;
00780            }
00781         }
00782        pbump(-written);
00783        total_written+=written;
00784      } while (total_written<total_to_write);
00785      return total_written;
00786    }
00787 
00788   virtual int overflow (int c) 
00789    {
00790      if (c != EOF) { *pptr() = c; pbump(1); }
00791      if (empty_buffer() == EOF) return EOF;
00792      return c;
00793    }
00794 
00795   virtual int sync ()
00796    {
00797      return (empty_buffer() == EOF) ? -1 : 0;
00798    }
00799 
00800   virtual int underflow ()
00801    {
00802      if (gptr() < egptr()) return *gptr();
00803      int pbcount = gptr() - eback();
00804      if (pbcount > pushbacksize) pbcount = pushbacksize;
00805      if (pbcount > 0) memmove (ibuffer+(pushbacksize-pbcount), gptr()-pbcount, pbcount);
00806      int count;
00807      signed int retries = 2; // maximum of retries in case of temporary errors
00808      do
00809      {
00810        PTHREAD_TESTCANCEL;
00811        count = read(fd, ibuffer+pushbacksize, buffersize);
00812        PTHREAD_TESTCANCEL;
00813        if (count <= 0) // any unexpected behaviour?
00814         {
00815          if (count==0) return EOF;
00816          --retries;
00817          const int err=errno;
00818          perror("unix_fd_buffer::underflow()");
00819          if (count==-1 && err == EAGAIN && retries>=0) continue; // this should fix some problems...
00820          if (count==-1 && err == EINTR && retries>=0) continue; // this should fix some problems...
00821          throw unix_buffer_exception(std::string("unix_fd_buffer::underflow() recv failed: ")+my_strerror(err));
00822          return EOF;
00823         }
00824      } while (count<=0);
00825      //cerr << "---> count: " << count << endl;
00826      setg(ibuffer+(pushbacksize-pbcount), ibuffer+pushbacksize,
00827           ibuffer+pushbacksize+count);
00828      return *gptr();
00829    }
00830   
00831 public:
00832 
00834   inline int get_descriptor() const { return fd; }
00835 
00836 
00851   void attach(const int _fd, const bool autoclose = true)
00852   {
00853     if (fd>0)
00854      {
00855        throw unix_buffer_exception("unix_fd_buffer::open(const int _fd): descriptor is already in use!");
00856      }
00857     if (_fd<0)
00858      {
00859        throw unix_buffer_exception("unix_fd_buffer::open(const int _fd): descriptor is invalid!");
00860      }
00861     fd = _fd; fd_autoclose=autoclose;
00862     setp(obuffer, obuffer+(buffersize-1));
00863     setg(ibuffer+pushbacksize, ibuffer+pushbacksize, ibuffer+pushbacksize);
00864   }
00865 
00873   virtual int detach()
00874   {
00875     if (fd<0) return -1; // no valid descriptor attached
00876     sync();
00877     if (fd_autoclose)
00878      {
00879        if (shutdown(fd, SHUT_RDWR)==-1) perror("~unix_fd_buffer():shutdown");
00880        close(fd);
00881        fd=-1; // fd now invalid
00882      }
00883     const int h=fd;
00884     fd=-1; // no descriptor attached to this object now
00885     return h;
00886   }
00887 
00889   unix_fd_buffer(void) // default constructor
00890    : fd(-1), // -1 denotes no descriptor defined yet
00891      fd_autoclose(true)
00892   {
00893     setp(obuffer, obuffer+(buffersize-1));
00894     setg(ibuffer+pushbacksize, ibuffer+pushbacksize, ibuffer+pushbacksize);
00895   }
00896 
00904   explicit unix_fd_buffer(const int _fd)
00905    : fd(_fd), fd_autoclose(true)
00906   {
00907     setp(obuffer, obuffer+(buffersize-1));
00908     setg(ibuffer+pushbacksize, ibuffer+pushbacksize, ibuffer+pushbacksize);
00909   }
00910 
00911 
00913   virtual ~unix_fd_buffer()
00914    {
00915      if (fd>=0)
00916       {
00917         // valid descriptor -> free resource
00918         sync();
00919         if (fd_autoclose) close(fd);
00920         fd=-1;
00921       }
00922    }
00923 
00924 };
00925 
00926 
00927 
00929 class unix_buffer : public unix_fd_buffer, public Cnetwork_connection_methods
00930 {
00931 protected:
00932   virtual int _get_descriptor() const { return fd; } 
00933 
00934   virtual int empty_buffer()
00935    {
00936      if (fd<0) // invalid file descriptor?
00937       {
00938         cerr << "unix_buffer::empty buffer(): invalid descriptor! fd=" << fd << endl;
00939         throw unix_buffer_exception("unix_buffer::empty buffer(): invalid descriptor!");
00940         return EOF;
00941       }
00942      const int total_to_write = pptr()-pbase();
00943      if (!total_to_write) return 0;
00944      if (total_to_write<0 || total_to_write>buffersize)
00945       {
00946         throw unix_buffer_exception("unix_buffer::empty_buffer() sanity check failed!");
00947       }
00948      int total_written = 0;
00949      signed int retries = 2; // maximum of retries in case of temporary errors
00950      do
00951      {
00952        PTHREAD_TESTCANCEL;
00953        const int count = total_to_write-total_written;
00954        const int written=send(fd, obuffer, count, MSG_NOSIGNAL);
00955        PTHREAD_TESTCANCEL;
00956        if (written!=count)
00957         {
00958           cerr << "unix_buffer::empty_buffer(): "
00959                << written << " bytes of " << count << " sent." << endl;
00960           if (written<0)
00961            {
00962              --retries;
00963              const int err=errno;
00964              perror("unix_buffer::empty_buffer()");
00965              if (written==-1 && err == EAGAIN && retries>=0) continue; // this may fix some problems...
00966              if (written==-1 && err == EINTR && retries>=0) continue; // this may fix some problems...
00967              throw unix_buffer_exception(std::string("unix_buffer::empty_buffer() send failed: ")+my_strerror(err));
00968              return EOF;
00969            }
00970         }
00971        pbump(-written);
00972        total_written+=written;
00973      } while (total_written<total_to_write);
00974      return total_written;
00975    }
00976 
00977   virtual int underflow ()
00978    {
00979      if (gptr() < egptr()) return *gptr();
00980      int pbcount = gptr() - eback();
00981      if (pbcount > pushbacksize) pbcount = pushbacksize;
00982      if (pbcount > 0) memmove (ibuffer+(pushbacksize-pbcount), gptr()-pbcount, pbcount);
00983      int count;
00984      signed int retries = 2; // maximum of retries in case of temporary errors
00985      do
00986      {
00987        PTHREAD_TESTCANCEL;
00988        count = recv(fd, ibuffer+pushbacksize, buffersize, MSG_NOSIGNAL);
00989        PTHREAD_TESTCANCEL;
00990        if (count <= 0) // any unexpected behaviour?
00991         {
00992          if (count==0) return EOF;
00993          --retries;
00994          const int err=errno;
00995          perror("unix_buffer::underflow()");
00996          if (count==-1 && err == EAGAIN && retries>=0) continue; // this should fix some problems...
00997          if (count==-1 && err == EINTR && retries>=0) continue; // this should fix some problems...
00998          throw unix_buffer_exception(std::string("unix_buffer::underflow() recv failed: ")+my_strerror(err));
00999          return EOF;
01000         }
01001      } while (count<=0);
01002      //cerr << "---> count: " << count << endl;
01003      setg(ibuffer+(pushbacksize-pbcount), ibuffer+pushbacksize,
01004           ibuffer+pushbacksize+count);
01005      return *gptr();
01006    }
01007   
01008 public:
01009 
01017   virtual int detach()
01018   {
01019     if (fd<0) return -1; // no valid descriptor attached
01020     sync();
01021     if (fd_autoclose)
01022      {
01023        if (shutdown(fd, SHUT_RDWR)==-1) perror("~unix_buffer():shutdown");
01024        close(fd);
01025        fd=-1; // fd now invalid
01026      }
01027     const int h=fd;
01028     fd=-1; // no descriptor attached to this object now
01029     return h;
01030   }
01031 
01033   unix_buffer(void) // Default constructor
01034    : unix_fd_buffer() // use defaults from unix_fd_buffer
01035   {
01036     set_timeout(75); // default: 75 seconds
01037   }
01038 
01047   explicit unix_buffer(const int _fd, const int secs_timeout = 75)
01048    : unix_fd_buffer(_fd)
01049   {
01050     set_timeout(secs_timeout);
01051   }
01052 
01058   explicit unix_buffer(const connection_waiter& cw)
01059    : unix_fd_buffer(cw.get_new_client_socket_descriptor())
01060   {
01061     set_timeout(cw.get_client_default_timeout());
01062   }
01063 
01070  unix_buffer(const std::string &host, const int port)
01071   : unix_fd_buffer()
01072   {
01073 
01074 #ifdef CYGWIN_COMPAT
01075     // cygwin compatibility mode
01076 
01077     struct sockaddr_in server;
01078     struct hostent *hp;
01079     int ret;
01080     int err_number;
01081     fd = socket(AF_INET, SOCK_STREAM, 0);
01082     if (fd < 0)
01083      {
01084        throw unix_buffer_exception("unix_buffer::constructor: Can't open socket");
01085      }
01086     server.sin_family = AF_INET;
01087 
01088     CYGW_LOCK;
01089     hp = gethostbyname(host.c_str());
01090     if (hp == NULL)
01091      {
01092        CYGW_UNLOCK;
01093        throw unix_buffer_exception("unix_buffer::constructor: Unknown host: "+host);
01094      }
01095     memcpy((char*)&server.sin_addr, (char*)hp->h_addr, hp->h_length);
01096     CYGW_UNLOCK;
01097 
01098     server.sin_port = htons(port);
01099     {
01100       int retries_on_EAGAIN = 3;
01101       while ((ret = connect(fd, reinterpret_cast<struct sockaddr *>(&server), sizeof(server))) < 0)
01102        {
01103         switch (err_number=errno)
01104          {
01105            case ETIMEDOUT:
01106             cerr << "." << flush; sleep(10); break;
01107            case ENETUNREACH:
01108             cerr << "%" << flush; sleep(10); break;
01109            case EAGAIN:
01110             if (retries_on_EAGAIN--)
01111              {
01112                cerr << "Can't connect socket (" << my_strerror(err_number) << ")" << endl;
01113                if (err_number==EAGAIN)
01114                 {
01115                   cerr << "Trying it again in 10 secs..." << endl;
01116                   sleep(10);
01117                 }
01118                break;
01119              }
01120            default:
01121             throw unix_buffer_exception(std::string("unix_buffer::constructor: Can't connect socket: ")+my_strerror(err_number));
01122          }
01123        }
01124     }
01125 
01126 #else
01127     // POSIX mode
01128 
01129     // refer "man getaddrinfo" and "man socket" 
01130     struct addrinfo hints; // our wishes are placed here
01131     memset(&hints,0,sizeof(hints)); // must be zeroed for default options
01132     hints.ai_family=PF_INET; // we want IPv4 as protocol
01133     hints.ai_socktype=SOCK_STREAM; // and we need a stream, not datagram!
01134     struct addrinfo *addrinfo_res = NULL; // here the result will be delivered
01135 #ifdef WEIRD_VALGRIND_HACK
01136     static CMutex serial;
01137     serial.lock();
01138     // normally (by specification) getaddrinfo is thread-safe!
01139     // however: under valgrind, multithreaded sessions will hang exactly at this
01140     // point; probably there are other errors as well; but how to track them, when
01141     // every session hangs here (without any useful information)?
01142     // -> with locked access: no error occurs!
01143     const int retval = getaddrinfo(host.c_str(),NULL,&hints,&addrinfo_res);
01144     serial.unlock();
01145 #else
01146     const int retval = getaddrinfo(host.c_str(),NULL,&hints,&addrinfo_res);
01147 #endif
01148     if ( retval || addrinfo_res==NULL ) // any error?
01149      {
01150        cerr << "can't reach " << "\"" <<  host << "\"" << endl;
01151        cerr << "Error given by getaddrinfo: " << endl;
01152        cerr << gai_strerror(retval) << endl;
01153        throw unix_buffer_exception("unix_buffer::constructor: Can't reach "+host+": "+gai_strerror(retval));
01154      }
01155     if (addrinfo_res->ai_socktype!=SOCK_STREAM) // we got a "stream"-protocol?
01156      {
01157        throw unix_buffer_exception("unix_buffer::constructor: provided protocol doesn't support SOCK_STREAM");
01158      }
01159     switch (addrinfo_res->ai_family)
01160      {
01161       case PF_INET:
01162        //cerr << "PF_INET: using IPv4!" << endl;
01163        fd = socket(AF_INET, SOCK_STREAM, 0);
01164        reinterpret_cast<sockaddr_in*>(addrinfo_res->ai_addr)->sin_port=htons(port);
01165        break;
01166       case PF_INET6:
01167        cerr << "PF_INET6: try using IPv6 (untested feature!)" << endl;
01168        fd = socket(AF_INET6, SOCK_STREAM, 0);
01169        reinterpret_cast<sockaddr_in6*>(addrinfo_res->ai_addr)->sin6_port=htons(port);
01170        break;
01171       default:
01172        throw unix_buffer_exception("unix_buffer::constructor: too bad! ai_family isn't supported by unix_buffer!");
01173      }
01174 
01175     if (fd < 0) // is the descriptor valid?
01176      {
01177        throw unix_buffer_exception("unix_buffer::constructor: Can't open socket");
01178      }
01179 
01180     {
01181       int retries_on_EAGAIN = 3;
01182       while ( connect(fd, addrinfo_res->ai_addr, addrinfo_res->ai_addrlen) <0 )
01183        {
01184         PTHREAD_TESTCANCEL;
01185         switch (int err_number=errno)
01186          {
01187            case ETIMEDOUT:
01188             cerr << "." << flush; sleep(10); break;
01189            case ENETUNREACH:
01190             cerr << "%" << flush; sleep(10); break;
01191            case EAGAIN:
01192             if (retries_on_EAGAIN--)
01193              {
01194                cerr << "Can't connect socket (" << my_strerror(err_number) << ")" << endl;
01195                if (err_number==EAGAIN)
01196                 {
01197                   cerr << "Trying it again in 10 secs..." << endl;
01198                   sleep(10);
01199                 }
01200                break;
01201              }
01202            default:
01203             throw unix_buffer_exception(std::string("unix_buffer::constructor: Can't connect socket: ")+my_strerror(err_number));
01204          }
01205        }
01206     }
01207     freeaddrinfo(addrinfo_res);
01208 #endif
01209 
01210     setp(obuffer, obuffer+(buffersize-1));
01211     setg(ibuffer+pushbacksize, ibuffer+pushbacksize, ibuffer+pushbacksize);
01212   }
01213 
01215   virtual ~unix_buffer()
01216    {
01217      if (fd>=0)
01218       {
01219         // valid descriptor -> close the socket
01220         // but be careful: the socket may be already dead, so we nee to catch exceptions!
01221         try { sync(); }
01222         catch (unix_buffer_exception &e)
01223          {
01224            cerr << "unix_buffer::destructor caught exception: " << e.what() << endl;
01225          }
01226         if (fd_autoclose)
01227          {
01228            if (shutdown(fd, SHUT_RDWR)==-1) perror("~unix_buffer():shutdown");
01229            close(fd);
01230          }
01231         fd=-1; // very important! tell the ancestor not to destruct fd twice!
01232       }
01233    }
01234 
01235 };
01236 
01237 
01238 
01239 
01241 class unix_io_stream : public iostream, public Cnetwork_connection_methods
01242 {
01243 protected:
01244   unix_buffer buf;
01245   virtual int _get_descriptor() const { return buf.get_descriptor(); } 
01246 
01247 public:
01248 
01250   inline int get_descriptor() const { return buf.get_descriptor(); }
01251 
01252 
01259   unix_io_stream(const std::string &host, const int port) : iostream(&buf), buf(host, port)
01260    {
01261    }
01262 
01263 
01269   explicit unix_io_stream(const connection_waiter& cw)
01270    : iostream(&buf), buf(cw)
01271    {
01272    }
01273 
01274 
01279   explicit unix_io_stream(const int _fd, const int secs_timeout = 75) : iostream(&buf), buf(_fd, secs_timeout)
01280    {
01281    }
01282 
01283 
01284   virtual ~unix_io_stream()
01285    {
01286    }
01287 };
01288 
01289 
01291 class fd_iostream : public std::iostream, public Cprovide_descriptor_access
01292  {
01293    protected:
01294     unix_fd_buffer buf;
01295     virtual int _get_descriptor() const { return buf.get_descriptor(); } 
01296    public:
01297 
01299     inline int get_descriptor() const { return buf.get_descriptor(); }
01300 
01301     explicit fd_iostream(const int _fd) : std::iostream(&buf), buf(_fd) { }
01302     virtual ~fd_iostream() { }
01303  };
01304 
01306 class sd_iostream : public fd_iostream, public Cnetwork_connection_methods
01307  {
01308    protected:
01309     virtual int _get_descriptor() const { return buf.get_descriptor(); }
01310    public:
01311     explicit sd_iostream(const int _fd) : fd_iostream(_fd) { }
01312     virtual ~sd_iostream() { }
01313  };
01314 
01315 
01317 class socket_piper
01318 {
01319  private:
01320   int socketdes[2];
01321 
01322  public:
01323 
01325   socket_piper()
01326   {
01327     socketdes[0]=socketdes[1]=-1;
01328     int retval=socketpair(AF_UNIX, SOCK_STREAM, 0, socketdes); // create a pipe (refer manpage SOCKETPAIR(2))
01329     if (retval) throw unix_buffer_exception(std::string("socket_piper::constructor: "+my_strerror(errno)));
01330   }
01331 
01333   ~socket_piper()
01334   {
01335     // delete undetached descriptors
01336     if (socketdes[0]>=0) close(socketdes[0]);
01337     if (socketdes[1]>=0) close(socketdes[1]);
01338   }
01339 
01345   std::istream* detach_InPipe(const int bufsize=-1)
01346   {
01347     if (socketdes[0]<0) throw unix_buffer_exception("socket_piper::detach_InPipe: no descriptor available!"); 
01348     sd_iostream *pInPipe = new sd_iostream(socketdes[0]);
01349     //std::cout << "InPipe bufsize (prev): " << pInPipe->get_RCVBUF() << endl;
01350     if (bufsize>=0) pInPipe->set_RCVBUF(bufsize);
01351     //std::cout << "InPipe bufsize (post): " << pInPipe->get_RCVBUF() << endl;
01352     socketdes[0]=-1; // detach
01353     return pInPipe;
01354   }
01355 
01361   std::ostream* detach_OutPipe(const int bufsize=-1)
01362   {
01363     if (socketdes[1]<0) throw unix_buffer_exception("socket_piper::detach_OutPipe: no descriptor available!"); 
01364     sd_iostream *pOutPipe = new sd_iostream(socketdes[1]);
01365     //std::cout << "OutPipe bufsize (prev): " << pOutPipe->get_SNDBUF() << endl;
01366     if (bufsize>=0) pOutPipe->set_SNDBUF(bufsize);
01367     //std::cout << "OutPipe bufsize (post): " << pOutPipe->get_SNDBUF() << endl;
01368     socketdes[1]=-1; // detach
01369     return pOutPipe;
01370   }  
01371 };
01372 
01373 
01374 #endif

Generated on Wed Nov 7 23:29:26 2007 for Qsieve by  doxygen 1.5.4