00001 #ifndef UNIX_BUFFER_
00002 #define UNIX_BUFFER_
00003
00004
00005
00006
00007
00014 #ifdef CYGWIN_COMPAT
00015 #warning cygwin compatibility hacks enabled
00016 #endif
00017
00018 extern "C"
00019 {
00020 #include <sys/socket.h>
00021 #include <sys/poll.h>
00022 #include <netinet/in.h>
00023 #include <netinet/tcp.h>
00024 #include <netdb.h>
00025 #include <unistd.h>
00026
00027 #if defined(_REENTRANT) || defined(_THREAD_SAFE)
00028
00029
00030
00031
00032 #include <pthread.h>
00033 #define PTHREAD_TESTCANCEL pthread_testcancel();
00034 #else
00035 #define PTHREAD_TESTCANCEL
00036 #endif
00037
00038 }
00039
00040
00041
00042
00043
00044
00045 #ifdef WEIRD_VALGRIND_HACK
00046 #if !( defined(_REENTRANT) || defined(_THREAD_SAFE) )
00047
00048 #warning "weird valgrind hack disabled"
00049 #undef WEIRD_VALGRIND_HACK
00050 #endif
00051 #endif
00052
00053 #include <cerrno>
00054 #include <cstring>
00055 #include <string>
00056 #include <iostream>
00057 #include <sstream>
00058 #include <exception>
00059
00060 #ifdef WEIRD_VALGRIND_HACK
00061 #warning "weird valgrind hack enabled"
00062 #include "mutex.H"
00063 #endif
00064
00065 #if defined(CYGWIN_COMPAT)
00066 #if defined(_REENTRANT) || defined(_THREAD_SAFE)
00067 #include "mutex.H"
00068 static CMutex cygwin_mutex;
00069 #define CYGW_LOCK cygwin_mutex.lock();
00070 #define CYGW_UNLOCK cygwin_mutex.unlock();
00071 #else
00072 #define CYGW_LOCK
00073 #define CYGW_UNLOCK
00074 #endif
00075 #endif
00076
00077
00078 using std::cerr;
00079 using std::endl;
00080 using std::flush;
00081 using std::iostream;
00082 using std::streambuf;
00083
00084
00086 class unix_buffer_exception : public std::exception
00087 {
00088 private:
00089 const std::string Message;
00090
00091 public:
00092 explicit unix_buffer_exception(const std::string& Msg) throw()
00093 : Message(Msg)
00094 {
00095 #if defined(VERBOSE) || defined(DEBUG)
00096 cerr << "throwing exception: " << Msg << endl;
00097 #endif
00098 }
00099
00100 virtual ~unix_buffer_exception() throw() { }
00101
00102 virtual const char* what() const throw()
00103 {
00104 return Message.c_str();
00105 }
00106 };
00107
00109 inline std::string my_strerror(const int errnum)
00110 {
00111 char buffer[1024];
00112
00113
00114
00115
00116 #if 0
00117
00118
00119 if (strerror_r(errnum,buffer,sizeof(buffer))==0) return buffer;
00120 else return "";
00121 #else
00122
00123
00124 char *ret = strerror_r(errnum,buffer,sizeof(buffer));
00125 if (ret) return ret; else return "";
00126 #endif
00127 }
00128
00129
00130
00139 int open_internet_port(const int port, const int qsize = 5)
00140 {
00141 int sd;
00142 int val = 1;
00143 struct sockaddr_in server;
00144 struct linger lingval;
00145
00146 if (port <= 0) return -1;
00147
00148 sd = socket(AF_INET, SOCK_STREAM, 0);
00149 if (sd < 0) return sd;
00150 if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) <0)
00151 return -1;
00152
00153
00154 lingval.l_onoff=1;
00155 lingval.l_linger=10*100;
00156 if (setsockopt(sd, SOL_SOCKET, SO_LINGER, &lingval, sizeof(lingval)) <0) return -1;
00157
00158 server.sin_family = AF_INET;
00159 server.sin_addr.s_addr = INADDR_ANY;
00160 server.sin_port = htons(port);
00161 if (bind(sd, reinterpret_cast<struct sockaddr *>(&server), sizeof(server)) < 0) return -1;
00162 listen (sd, qsize);
00163 return sd;
00164 }
00165
00166
00167
00168
00169
00171 inline std::ostream& operator<< (std::ostream& ostr, const sockaddr &sa)
00172 {
00173 #ifdef CYGWIN_COMPAT
00174
00175 CYGW_LOCK;
00176 u_int32_t ip = reinterpret_cast<const sockaddr_in& >(sa).sin_addr.s_addr;
00177 struct hostent *p_hostent = gethostbyaddr(reinterpret_cast<const char*>(&ip),sizeof(ip),AF_INET);
00178
00179 if (p_hostent) ostr << p_hostent->h_name;
00180 else ostr << "(unknown)";
00181 CYGW_UNLOCK;
00182 #else
00183
00184 char host[1024];
00185 char service[1024];
00186 const int flags = NI_NOFQDN;
00187
00188
00189
00190 const int retval = getnameinfo(&sa,sizeof(sa),host,sizeof(host),service,sizeof(service),flags);
00191
00192 switch (retval)
00193 {
00194 case 0:
00195 ostr << host << " " << service;
00196 break;
00197 case EAI_AGAIN:
00198 ostr << "(The name could not be resolved at this time.)"; break;
00199 case EAI_BADFLAGS:
00200 ostr << "(The flags parameter has an invalid value.)"; break;
00201 case EAI_FAIL:
00202 ostr << "(A non-recoverable error occurred.)"; break;
00203 case EAI_FAMILY:
00204 ostr << "(The address family was not recognized.)"; break;
00205 case EAI_MEMORY:
00206 ostr << "(Out of memory.)"; break;
00207 case EAI_NONAME:
00208 ostr << "(The name does not resolve.)"; break;
00209 case EAI_SYSTEM:
00210 ostr << "(A system error occurred.)"; break;
00211 default:
00212 ostr << "(unspecified error in getnameinfo.)";
00213 }
00214 #endif
00215 return ostr;
00216 }
00217
00223 inline std::string peer_info(const int socket_descriptor)
00224 {
00225
00226 std::ostringstream ostr;
00227
00228 struct sockaddr sa;
00229 socklen_t sl = sizeof(sa);
00230
00231 const int retval = getpeername(socket_descriptor, &sa, &sl);
00232 if (retval==0) ostr << sa;
00233 return ostr.str();
00234 }
00235
00241 inline std::string socket_info(const int socket_descriptor)
00242 {
00243
00244 std::ostringstream ostr;
00245
00246 struct sockaddr sa;
00247 socklen_t sl = sizeof(sa);
00248
00249 const int retval = getsockname(socket_descriptor, &sa, &sl);
00250 if (retval==0) ostr << sa;
00251 return ostr.str();
00252 }
00253
00260 inline std::string connection_info(const int socket_descriptor)
00261 {
00262
00263 std::ostringstream ostr;
00264
00265 struct sockaddr sa;
00266 socklen_t sl = sizeof(sa);
00267
00268 const int retval1 = getpeername(socket_descriptor, &sa, &sl);
00269 ostr << "connection from ";
00270 switch (retval1)
00271 {
00272 case 0 : ostr << sa; break;
00273 case EBADF : ostr << "(invalid descriptor)"; break;
00274 case ENOTSOCK: ostr << "(file, not a socket)"; break;
00275 default : ostr << "(unable to perform getpeername)";
00276 }
00277
00278 const int retval2 = getsockname(socket_descriptor, &sa, &sl);
00279 ostr << " to ";
00280 switch (retval2)
00281 {
00282 case 0 : ostr << sa; break;
00283 case EBADF : ostr << "(invalid descriptor)"; break;
00284 case ENOTSOCK: ostr << "(file, not a socket)"; break;
00285 default : ostr << "(unable to perform getsockname)";
00286 }
00287 return ostr.str();
00288 }
00289
00296 template<class T> inline std::string peer_info(const T& obj)
00297 {
00298
00299 return peer_info(obj.get_descriptor());
00300 }
00301
00308 template<class T> inline std::string socket_info(const T& obj)
00309 {
00310
00311 return socket_info(obj.get_descriptor());
00312 }
00313
00321 template<class T> inline std::string connection_info(const T& obj)
00322 {
00323
00324 return connection_info(obj.get_descriptor());
00325 }
00326
00327
00329 class Cprovide_descriptor_access
00330 {
00331 protected:
00332 virtual int _get_descriptor() const = 0;
00333 public:
00334 virtual ~Cprovide_descriptor_access() { }
00335 };
00336
00337
00348 class Cpoll_methods : public Cprovide_descriptor_access
00349 {
00350 public:
00351 virtual ~Cpoll_methods() {}
00352
00358 int poll_events(short requested_events, const int timeout_ms=1) const
00359 {
00360 pollfd pfd;
00361 int res=0;
00362 do
00363 {
00364 pfd.fd=_get_descriptor(); pfd.events=requested_events; pfd.revents=0;
00365 res=poll(&pfd,1,timeout_ms);
00366 if (res==-1) if (errno!=EINTR) return -1;
00367 } while (res==-1);
00368 return pfd.revents;
00369 }
00370
00372 inline bool bad() const
00373 {
00374 const int revents=poll_events(POLLIN);
00375 return (revents==-1) | (revents&(POLLERR|POLLHUP|POLLNVAL));
00376 }
00377
00379 inline bool good() const
00380 {
00381 return !bad();
00382 }
00383
00385 int readable_chars_within(const int size, const int timeout_ms=1000) const
00386 {
00387 const int revents=poll_events(POLLIN,timeout_ms);
00388 if (revents==-1) return -1;
00389 if (revents&(POLLERR|POLLHUP|POLLNVAL)) return -1;
00390 if (revents&POLLIN)
00391 {
00392 char *buf = new char[size];
00393 int count = recv(_get_descriptor(), buf, size, MSG_PEEK|MSG_NOSIGNAL);
00394 delete [] buf;
00395 return count;
00396 }
00397 return -1;
00398 }
00399
00401 bool writable_now() const
00402 {
00403 const int revents=poll_events(POLLOUT);
00404 if (revents==-1) return false;
00405 if (revents&(POLLERR|POLLHUP|POLLNVAL)) return false;
00406 return (revents&POLLOUT);
00407 }
00408 };
00409
00410
00411
00422 class Cpoll : public Cpoll_methods
00423 {
00424 private:
00425 int fd;
00426
00427 protected:
00428 virtual int _get_descriptor() const { return fd; }
00429
00430 public:
00432 explicit Cpoll(const int _fd=0) : fd(_fd) {}
00433
00435 template<class T> explicit Cpoll(const T &obj) : fd(obj.get_descriptor()) {}
00436
00437 virtual ~Cpoll() {}
00438
00440 void attach(const int _fd) { fd=_fd; }
00441
00447 template<class T> void attach(const T &obj) { attach(obj.get_descriptor()); }
00448 };
00449
00450
00451
00453 class Cnetwork_connection_methods : public Cprovide_descriptor_access
00454 {
00455 public:
00456 virtual ~Cnetwork_connection_methods() { }
00457
00467 inline void set_RCVBUF(const int size)
00468 {
00469 const int fd = _get_descriptor();
00470 int val = size;
00471 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val))==-1)
00472 throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_RCVBUF(): "+my_strerror(errno)));
00473 }
00474
00482 inline void set_SNDBUF(const int size)
00483 {
00484 const int fd = _get_descriptor();
00485 int val;
00486 val=size;
00487 if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val))==-1)
00488 throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_SNDBUF(): "+my_strerror(errno)));
00489 }
00490
00492 inline int get_RCVBUF()
00493 {
00494 const int fd = _get_descriptor();
00495 int val = -1;
00496 socklen_t optlen = sizeof(val);
00497 if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, &optlen)==-1)
00498 {
00499 perror("Cnetwork_connection_methods::get_RCVBUF()");
00500 return -1;
00501 }
00502 else return val;
00503 }
00504
00506 inline int get_SNDBUF()
00507 {
00508 const int fd = _get_descriptor();
00509 int val = -1;
00510 socklen_t optlen = sizeof(val);
00511 if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, &optlen)==-1)
00512 {
00513 perror("Cnetwork_connection_methods::get_SNDBUF()");
00514 return -1;
00515 }
00516 else return val;
00517 }
00518
00520 inline void set_RCV_timeout(const int secs, const int u_secs = 0)
00521 {
00522 const int fd = _get_descriptor();
00523 timeval tv;
00524 tv.tv_sec = secs;
00525 tv.tv_usec = u_secs;
00526 if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))==-1)
00527 throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_RCV_timeout(): "+my_strerror(errno)));
00528 }
00529
00531 inline void set_SND_timeout(const int secs, const int u_secs = 0)
00532 {
00533 const int fd = _get_descriptor();
00534 timeval tv;
00535 tv.tv_sec = secs;
00536 tv.tv_usec = u_secs;
00537 if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))==-1)
00538 throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_SND_timeout(): "+my_strerror(errno)));
00539 }
00540
00542 inline int get_RCV_timeout()
00543 {
00544 const int fd = _get_descriptor();
00545 timeval tv;
00546 socklen_t optlen = sizeof(tv);
00547 if (getsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, &optlen)==-1)
00548 {
00549 perror("Cnetwork_connection_methods::get_RCV_timeout()");
00550 return -1;
00551 }
00552 else return tv.tv_sec;
00553 }
00554
00556 inline int get_SND_timeout()
00557 {
00558 const int fd = _get_descriptor();
00559 timeval tv;
00560 socklen_t optlen = sizeof(tv);
00561 if (getsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, &optlen)==-1)
00562 {
00563 perror("Cnetwork_connection_methods::get_SND_timeout()");
00564 return -1;
00565 }
00566 else return tv.tv_sec;
00567 }
00568
00570 inline void set_timeout(const int secs, const int u_secs = 0)
00571 {
00572 const int fd = _get_descriptor();
00573 timeval tv;
00574 tv.tv_sec = secs;
00575 tv.tv_usec = u_secs;
00576 if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))==-1)
00577 throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_timeout(), RCVTIMEO: "+my_strerror(errno)));
00578 if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))==-1)
00579 throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_timeout(), SNDTIMEO: "+my_strerror(errno)));
00580 }
00581
00583 inline void set_TCP_NODELAY(const bool flag = true)
00584 {
00585 const int fd = _get_descriptor();
00586 int optval=flag;
00587 if (setsockopt(fd, SOL_TCP, TCP_NODELAY, &optval, sizeof(optval))==-1)
00588 throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_TCP_NODELAY(): "+my_strerror(errno)));
00589 }
00590
00592 inline bool get_TCP_NODELAY()
00593 {
00594 const int fd = _get_descriptor();
00595 int optval;
00596 socklen_t optlen = sizeof(optval);
00597 if (getsockopt(fd, SOL_TCP, TCP_NODELAY, &optval, &optlen)==-1)
00598 perror("Cnetwork_connection_methods::get_TCP_NODELAY");
00599 return optval;
00600 }
00601
00603 inline void set_LINGER(const signed int seconds)
00604 {
00605
00606
00607 const int fd = _get_descriptor();
00608 struct linger lingval;
00609 if (seconds<0) { lingval.l_onoff=0; lingval.l_linger=0; }
00610 else { lingval.l_onoff=1; lingval.l_linger=seconds; }
00611 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &lingval, sizeof(lingval))==-1)
00612 throw unix_buffer_exception(std::string("Cnetwork_connection_methods::set_LINGER(): "+my_strerror(errno)));
00613 }
00614
00616 inline signed int get_LINGER()
00617 {
00618
00619 const int fd = _get_descriptor();
00620 struct linger lingval;
00621 socklen_t optlen = sizeof(lingval);
00622 if (getsockopt(fd, SOL_SOCKET, SO_LINGER, &lingval, &optlen)==-1)
00623 {
00624 perror("Cnetwork_connection_methods::get_LINGER()");
00625 return -2;
00626 }
00627 if (lingval.l_onoff==0) return -1;
00628 else return lingval.l_linger;
00629 }
00630
00635 inline const std::string peer_info()
00636 {
00637
00638 return ::peer_info(_get_descriptor());
00639 }
00640
00645 inline const std::string socket_info()
00646 {
00647
00648 return ::socket_info(_get_descriptor());
00649 }
00650
00656 inline const std::string connection_info()
00657 {
00658
00659 return ::connection_info(_get_descriptor());
00660 }
00661
00662 };
00663
00664
00665
00666
00671 class connection_waiter : public Cnetwork_connection_methods
00672 {
00673 protected:
00674 int server_socket;
00675 const int client_default_timeout;
00676
00677 virtual int _get_descriptor() const { return server_socket; }
00678 public:
00679
00687 explicit connection_waiter(const int server_port, int server_port_tries = 10, const int _client_default_timeout = 75)
00688 : server_socket(-1), client_default_timeout(_client_default_timeout)
00689 {
00690 while ((server_socket = open_internet_port(server_port,10))<0
00691 && --server_port_tries>=0) sleep(1);
00692 if (server_socket<0)
00693 {
00694 cerr << "connection_waiter::constructor: cannot open serverport " << server_port << endl;
00695 throw unix_buffer_exception("connection_waiter::constructor: cannot open serverport");
00696 }
00697 }
00698
00700 ~connection_waiter()
00701 {
00702 if (shutdown(server_socket, SHUT_RDWR)==-1) perror("~connection_waiter():shutdown");
00703 close(server_socket);
00704 }
00705
00710 int get_new_client_socket_descriptor() const
00711 {
00712 int sd;
00713 do
00714 {
00715 sd=accept(server_socket, NULL, NULL);
00716 PTHREAD_TESTCANCEL;
00717 } while (sd==0);
00718 return sd;
00719 }
00720
00725 int get_client_default_timeout() const
00726 {
00727 return client_default_timeout;
00728 }
00729 };
00730
00731
00733 class unix_fd_buffer : public streambuf, public Cprovide_descriptor_access
00734 {
00735 protected:
00736 static const int buffersize = 4096;
00737 static const int pushbacksize = 10;
00738 int fd;
00739 bool fd_autoclose;
00740 char obuffer[buffersize];
00741 char ibuffer[buffersize+pushbacksize];
00742
00743 virtual int _get_descriptor() const { return fd; }
00744
00745 virtual int empty_buffer()
00746 {
00747 if (fd<0)
00748 {
00749 cerr << "unix_fd_buffer::empty buffer(): invalid descriptor! fd=" << fd << endl;
00750 throw unix_buffer_exception("unix_fd_buffer::empty buffer(): invalid descriptor!");
00751 return EOF;
00752 }
00753 const int total_to_write = pptr()-pbase();
00754 if (!total_to_write) return 0;
00755 if (total_to_write<0 || total_to_write>buffersize)
00756 {
00757 throw unix_buffer_exception("unix_fd_buffer::empty_buffer() sanity check failed!");
00758 }
00759 int total_written = 0;
00760 signed int retries = 2;
00761 do
00762 {
00763 PTHREAD_TESTCANCEL;
00764 const int count = total_to_write-total_written;
00765 const int written=write(fd, obuffer, count);
00766 PTHREAD_TESTCANCEL;
00767 if (written!=count)
00768 {
00769 cerr << "unix_fd_buffer::empty_buffer(): "
00770 << written << " bytes of " << count << " sent." << endl;
00771 if (written<0)
00772 {
00773 --retries;
00774 const int err=errno;
00775 perror("unix_fd_buffer::empty_buffer()");
00776 if (written==-1 && err == EAGAIN && retries>=0) continue;
00777 if (written==-1 && err == EINTR && retries>=0) continue;
00778 throw unix_buffer_exception(std::string("unix_fd_buffer::empty_buffer() send failed: ")+my_strerror(err));
00779 return EOF;
00780 }
00781 }
00782 pbump(-written);
00783 total_written+=written;
00784 } while (total_written<total_to_write);
00785 return total_written;
00786 }
00787
00788 virtual int overflow (int c)
00789 {
00790 if (c != EOF) { *pptr() = c; pbump(1); }
00791 if (empty_buffer() == EOF) return EOF;
00792 return c;
00793 }
00794
00795 virtual int sync ()
00796 {
00797 return (empty_buffer() == EOF) ? -1 : 0;
00798 }
00799
00800 virtual int underflow ()
00801 {
00802 if (gptr() < egptr()) return *gptr();
00803 int pbcount = gptr() - eback();
00804 if (pbcount > pushbacksize) pbcount = pushbacksize;
00805 if (pbcount > 0) memmove (ibuffer+(pushbacksize-pbcount), gptr()-pbcount, pbcount);
00806 int count;
00807 signed int retries = 2;
00808 do
00809 {
00810 PTHREAD_TESTCANCEL;
00811 count = read(fd, ibuffer+pushbacksize, buffersize);
00812 PTHREAD_TESTCANCEL;
00813 if (count <= 0)
00814 {
00815 if (count==0) return EOF;
00816 --retries;
00817 const int err=errno;
00818 perror("unix_fd_buffer::underflow()");
00819 if (count==-1 && err == EAGAIN && retries>=0) continue;
00820 if (count==-1 && err == EINTR && retries>=0) continue;
00821 throw unix_buffer_exception(std::string("unix_fd_buffer::underflow() recv failed: ")+my_strerror(err));
00822 return EOF;
00823 }
00824 } while (count<=0);
00825
00826 setg(ibuffer+(pushbacksize-pbcount), ibuffer+pushbacksize,
00827 ibuffer+pushbacksize+count);
00828 return *gptr();
00829 }
00830
00831 public:
00832
00834 inline int get_descriptor() const { return fd; }
00835
00836
00851 void attach(const int _fd, const bool autoclose = true)
00852 {
00853 if (fd>0)
00854 {
00855 throw unix_buffer_exception("unix_fd_buffer::open(const int _fd): descriptor is already in use!");
00856 }
00857 if (_fd<0)
00858 {
00859 throw unix_buffer_exception("unix_fd_buffer::open(const int _fd): descriptor is invalid!");
00860 }
00861 fd = _fd; fd_autoclose=autoclose;
00862 setp(obuffer, obuffer+(buffersize-1));
00863 setg(ibuffer+pushbacksize, ibuffer+pushbacksize, ibuffer+pushbacksize);
00864 }
00865
00873 virtual int detach()
00874 {
00875 if (fd<0) return -1;
00876 sync();
00877 if (fd_autoclose)
00878 {
00879 if (shutdown(fd, SHUT_RDWR)==-1) perror("~unix_fd_buffer():shutdown");
00880 close(fd);
00881 fd=-1;
00882 }
00883 const int h=fd;
00884 fd=-1;
00885 return h;
00886 }
00887
00889 unix_fd_buffer(void)
00890 : fd(-1),
00891 fd_autoclose(true)
00892 {
00893 setp(obuffer, obuffer+(buffersize-1));
00894 setg(ibuffer+pushbacksize, ibuffer+pushbacksize, ibuffer+pushbacksize);
00895 }
00896
00904 explicit unix_fd_buffer(const int _fd)
00905 : fd(_fd), fd_autoclose(true)
00906 {
00907 setp(obuffer, obuffer+(buffersize-1));
00908 setg(ibuffer+pushbacksize, ibuffer+pushbacksize, ibuffer+pushbacksize);
00909 }
00910
00911
00913 virtual ~unix_fd_buffer()
00914 {
00915 if (fd>=0)
00916 {
00917
00918 sync();
00919 if (fd_autoclose) close(fd);
00920 fd=-1;
00921 }
00922 }
00923
00924 };
00925
00926
00927
00929 class unix_buffer : public unix_fd_buffer, public Cnetwork_connection_methods
00930 {
00931 protected:
00932 virtual int _get_descriptor() const { return fd; }
00933
00934 virtual int empty_buffer()
00935 {
00936 if (fd<0)
00937 {
00938 cerr << "unix_buffer::empty buffer(): invalid descriptor! fd=" << fd << endl;
00939 throw unix_buffer_exception("unix_buffer::empty buffer(): invalid descriptor!");
00940 return EOF;
00941 }
00942 const int total_to_write = pptr()-pbase();
00943 if (!total_to_write) return 0;
00944 if (total_to_write<0 || total_to_write>buffersize)
00945 {
00946 throw unix_buffer_exception("unix_buffer::empty_buffer() sanity check failed!");
00947 }
00948 int total_written = 0;
00949 signed int retries = 2;
00950 do
00951 {
00952 PTHREAD_TESTCANCEL;
00953 const int count = total_to_write-total_written;
00954 const int written=send(fd, obuffer, count, MSG_NOSIGNAL);
00955 PTHREAD_TESTCANCEL;
00956 if (written!=count)
00957 {
00958 cerr << "unix_buffer::empty_buffer(): "
00959 << written << " bytes of " << count << " sent." << endl;
00960 if (written<0)
00961 {
00962 --retries;
00963 const int err=errno;
00964 perror("unix_buffer::empty_buffer()");
00965 if (written==-1 && err == EAGAIN && retries>=0) continue;
00966 if (written==-1 && err == EINTR && retries>=0) continue;
00967 throw unix_buffer_exception(std::string("unix_buffer::empty_buffer() send failed: ")+my_strerror(err));
00968 return EOF;
00969 }
00970 }
00971 pbump(-written);
00972 total_written+=written;
00973 } while (total_written<total_to_write);
00974 return total_written;
00975 }
00976
00977 virtual int underflow ()
00978 {
00979 if (gptr() < egptr()) return *gptr();
00980 int pbcount = gptr() - eback();
00981 if (pbcount > pushbacksize) pbcount = pushbacksize;
00982 if (pbcount > 0) memmove (ibuffer+(pushbacksize-pbcount), gptr()-pbcount, pbcount);
00983 int count;
00984 signed int retries = 2;
00985 do
00986 {
00987 PTHREAD_TESTCANCEL;
00988 count = recv(fd, ibuffer+pushbacksize, buffersize, MSG_NOSIGNAL);
00989 PTHREAD_TESTCANCEL;
00990 if (count <= 0)
00991 {
00992 if (count==0) return EOF;
00993 --retries;
00994 const int err=errno;
00995 perror("unix_buffer::underflow()");
00996 if (count==-1 && err == EAGAIN && retries>=0) continue;
00997 if (count==-1 && err == EINTR && retries>=0) continue;
00998 throw unix_buffer_exception(std::string("unix_buffer::underflow() recv failed: ")+my_strerror(err));
00999 return EOF;
01000 }
01001 } while (count<=0);
01002
01003 setg(ibuffer+(pushbacksize-pbcount), ibuffer+pushbacksize,
01004 ibuffer+pushbacksize+count);
01005 return *gptr();
01006 }
01007
01008 public:
01009
01017 virtual int detach()
01018 {
01019 if (fd<0) return -1;
01020 sync();
01021 if (fd_autoclose)
01022 {
01023 if (shutdown(fd, SHUT_RDWR)==-1) perror("~unix_buffer():shutdown");
01024 close(fd);
01025 fd=-1;
01026 }
01027 const int h=fd;
01028 fd=-1;
01029 return h;
01030 }
01031
01033 unix_buffer(void)
01034 : unix_fd_buffer()
01035 {
01036 set_timeout(75);
01037 }
01038
01047 explicit unix_buffer(const int _fd, const int secs_timeout = 75)
01048 : unix_fd_buffer(_fd)
01049 {
01050 set_timeout(secs_timeout);
01051 }
01052
01058 explicit unix_buffer(const connection_waiter& cw)
01059 : unix_fd_buffer(cw.get_new_client_socket_descriptor())
01060 {
01061 set_timeout(cw.get_client_default_timeout());
01062 }
01063
01070 unix_buffer(const std::string &host, const int port)
01071 : unix_fd_buffer()
01072 {
01073
01074 #ifdef CYGWIN_COMPAT
01075
01076
01077 struct sockaddr_in server;
01078 struct hostent *hp;
01079 int ret;
01080 int err_number;
01081 fd = socket(AF_INET, SOCK_STREAM, 0);
01082 if (fd < 0)
01083 {
01084 throw unix_buffer_exception("unix_buffer::constructor: Can't open socket");
01085 }
01086 server.sin_family = AF_INET;
01087
01088 CYGW_LOCK;
01089 hp = gethostbyname(host.c_str());
01090 if (hp == NULL)
01091 {
01092 CYGW_UNLOCK;
01093 throw unix_buffer_exception("unix_buffer::constructor: Unknown host: "+host);
01094 }
01095 memcpy((char*)&server.sin_addr, (char*)hp->h_addr, hp->h_length);
01096 CYGW_UNLOCK;
01097
01098 server.sin_port = htons(port);
01099 {
01100 int retries_on_EAGAIN = 3;
01101 while ((ret = connect(fd, reinterpret_cast<struct sockaddr *>(&server), sizeof(server))) < 0)
01102 {
01103 switch (err_number=errno)
01104 {
01105 case ETIMEDOUT:
01106 cerr << "." << flush; sleep(10); break;
01107 case ENETUNREACH:
01108 cerr << "%" << flush; sleep(10); break;
01109 case EAGAIN:
01110 if (retries_on_EAGAIN--)
01111 {
01112 cerr << "Can't connect socket (" << my_strerror(err_number) << ")" << endl;
01113 if (err_number==EAGAIN)
01114 {
01115 cerr << "Trying it again in 10 secs..." << endl;
01116 sleep(10);
01117 }
01118 break;
01119 }
01120 default:
01121 throw unix_buffer_exception(std::string("unix_buffer::constructor: Can't connect socket: ")+my_strerror(err_number));
01122 }
01123 }
01124 }
01125
01126 #else
01127
01128
01129
01130 struct addrinfo hints;
01131 memset(&hints,0,sizeof(hints));
01132 hints.ai_family=PF_INET;
01133 hints.ai_socktype=SOCK_STREAM;
01134 struct addrinfo *addrinfo_res = NULL;
01135 #ifdef WEIRD_VALGRIND_HACK
01136 static CMutex serial;
01137 serial.lock();
01138
01139
01140
01141
01142
01143 const int retval = getaddrinfo(host.c_str(),NULL,&hints,&addrinfo_res);
01144 serial.unlock();
01145 #else
01146 const int retval = getaddrinfo(host.c_str(),NULL,&hints,&addrinfo_res);
01147 #endif
01148 if ( retval || addrinfo_res==NULL )
01149 {
01150 cerr << "can't reach " << "\"" << host << "\"" << endl;
01151 cerr << "Error given by getaddrinfo: " << endl;
01152 cerr << gai_strerror(retval) << endl;
01153 throw unix_buffer_exception("unix_buffer::constructor: Can't reach "+host+": "+gai_strerror(retval));
01154 }
01155 if (addrinfo_res->ai_socktype!=SOCK_STREAM)
01156 {
01157 throw unix_buffer_exception("unix_buffer::constructor: provided protocol doesn't support SOCK_STREAM");
01158 }
01159 switch (addrinfo_res->ai_family)
01160 {
01161 case PF_INET:
01162
01163 fd = socket(AF_INET, SOCK_STREAM, 0);
01164 reinterpret_cast<sockaddr_in*>(addrinfo_res->ai_addr)->sin_port=htons(port);
01165 break;
01166 case PF_INET6:
01167 cerr << "PF_INET6: try using IPv6 (untested feature!)" << endl;
01168 fd = socket(AF_INET6, SOCK_STREAM, 0);
01169 reinterpret_cast<sockaddr_in6*>(addrinfo_res->ai_addr)->sin6_port=htons(port);
01170 break;
01171 default:
01172 throw unix_buffer_exception("unix_buffer::constructor: too bad! ai_family isn't supported by unix_buffer!");
01173 }
01174
01175 if (fd < 0)
01176 {
01177 throw unix_buffer_exception("unix_buffer::constructor: Can't open socket");
01178 }
01179
01180 {
01181 int retries_on_EAGAIN = 3;
01182 while ( connect(fd, addrinfo_res->ai_addr, addrinfo_res->ai_addrlen) <0 )
01183 {
01184 PTHREAD_TESTCANCEL;
01185 switch (int err_number=errno)
01186 {
01187 case ETIMEDOUT:
01188 cerr << "." << flush; sleep(10); break;
01189 case ENETUNREACH:
01190 cerr << "%" << flush; sleep(10); break;
01191 case EAGAIN:
01192 if (retries_on_EAGAIN--)
01193 {
01194 cerr << "Can't connect socket (" << my_strerror(err_number) << ")" << endl;
01195 if (err_number==EAGAIN)
01196 {
01197 cerr << "Trying it again in 10 secs..." << endl;
01198 sleep(10);
01199 }
01200 break;
01201 }
01202 default:
01203 throw unix_buffer_exception(std::string("unix_buffer::constructor: Can't connect socket: ")+my_strerror(err_number));
01204 }
01205 }
01206 }
01207 freeaddrinfo(addrinfo_res);
01208 #endif
01209
01210 setp(obuffer, obuffer+(buffersize-1));
01211 setg(ibuffer+pushbacksize, ibuffer+pushbacksize, ibuffer+pushbacksize);
01212 }
01213
01215 virtual ~unix_buffer()
01216 {
01217 if (fd>=0)
01218 {
01219
01220
01221 try { sync(); }
01222 catch (unix_buffer_exception &e)
01223 {
01224 cerr << "unix_buffer::destructor caught exception: " << e.what() << endl;
01225 }
01226 if (fd_autoclose)
01227 {
01228 if (shutdown(fd, SHUT_RDWR)==-1) perror("~unix_buffer():shutdown");
01229 close(fd);
01230 }
01231 fd=-1;
01232 }
01233 }
01234
01235 };
01236
01237
01238
01239
01241 class unix_io_stream : public iostream, public Cnetwork_connection_methods
01242 {
01243 protected:
01244 unix_buffer buf;
01245 virtual int _get_descriptor() const { return buf.get_descriptor(); }
01246
01247 public:
01248
01250 inline int get_descriptor() const { return buf.get_descriptor(); }
01251
01252
01259 unix_io_stream(const std::string &host, const int port) : iostream(&buf), buf(host, port)
01260 {
01261 }
01262
01263
01269 explicit unix_io_stream(const connection_waiter& cw)
01270 : iostream(&buf), buf(cw)
01271 {
01272 }
01273
01274
01279 explicit unix_io_stream(const int _fd, const int secs_timeout = 75) : iostream(&buf), buf(_fd, secs_timeout)
01280 {
01281 }
01282
01283
01284 virtual ~unix_io_stream()
01285 {
01286 }
01287 };
01288
01289
01291 class fd_iostream : public std::iostream, public Cprovide_descriptor_access
01292 {
01293 protected:
01294 unix_fd_buffer buf;
01295 virtual int _get_descriptor() const { return buf.get_descriptor(); }
01296 public:
01297
01299 inline int get_descriptor() const { return buf.get_descriptor(); }
01300
01301 explicit fd_iostream(const int _fd) : std::iostream(&buf), buf(_fd) { }
01302 virtual ~fd_iostream() { }
01303 };
01304
01306 class sd_iostream : public fd_iostream, public Cnetwork_connection_methods
01307 {
01308 protected:
01309 virtual int _get_descriptor() const { return buf.get_descriptor(); }
01310 public:
01311 explicit sd_iostream(const int _fd) : fd_iostream(_fd) { }
01312 virtual ~sd_iostream() { }
01313 };
01314
01315
01317 class socket_piper
01318 {
01319 private:
01320 int socketdes[2];
01321
01322 public:
01323
01325 socket_piper()
01326 {
01327 socketdes[0]=socketdes[1]=-1;
01328 int retval=socketpair(AF_UNIX, SOCK_STREAM, 0, socketdes);
01329 if (retval) throw unix_buffer_exception(std::string("socket_piper::constructor: "+my_strerror(errno)));
01330 }
01331
01333 ~socket_piper()
01334 {
01335
01336 if (socketdes[0]>=0) close(socketdes[0]);
01337 if (socketdes[1]>=0) close(socketdes[1]);
01338 }
01339
01345 std::istream* detach_InPipe(const int bufsize=-1)
01346 {
01347 if (socketdes[0]<0) throw unix_buffer_exception("socket_piper::detach_InPipe: no descriptor available!");
01348 sd_iostream *pInPipe = new sd_iostream(socketdes[0]);
01349
01350 if (bufsize>=0) pInPipe->set_RCVBUF(bufsize);
01351
01352 socketdes[0]=-1;
01353 return pInPipe;
01354 }
01355
01361 std::ostream* detach_OutPipe(const int bufsize=-1)
01362 {
01363 if (socketdes[1]<0) throw unix_buffer_exception("socket_piper::detach_OutPipe: no descriptor available!");
01364 sd_iostream *pOutPipe = new sd_iostream(socketdes[1]);
01365
01366 if (bufsize>=0) pOutPipe->set_SNDBUF(bufsize);
01367
01368 socketdes[1]=-1;
01369 return pOutPipe;
01370 }
01371 };
01372
01373
01374 #endif