Do not leak TcpAcceptor job on reconfiguration by monitoring and reacting to the listening socket closure. Every job that waits for Comm I/O must have a FD closure handler. Unfortunately, we currently cannot enforce that rule automatically. === modified file 'src/comm/TcpAcceptor.cc' --- src/comm/TcpAcceptor.cc 2014-04-14 06:50:31 +0000 +++ src/comm/TcpAcceptor.cc 2014-04-24 20:12:30 +0000 @@ -105,40 +105,46 @@ Comm::TcpAcceptor::doneAll() const { // stop when FD is closed if (!IsConnOpen(conn)) { return AsyncJob::doneAll(); } // stop when handlers are gone if (theCallSub == NULL) { return AsyncJob::doneAll(); } // open FD with handlers...keep accepting. return false; } void Comm::TcpAcceptor::swanSong() { debugs(5,5, HERE); unsubscribe("swanSong"); + if (IsConnOpen(conn)) { + if (closer_ != NULL) + comm_remove_close_handler(conn->fd, closer_); + conn->close(); + } + conn = NULL; AcceptLimiter::Instance().removeDead(this); AsyncJob::swanSong(); } const char * Comm::TcpAcceptor::status() const { if (conn == NULL) return "[nil connection]"; static char ipbuf[MAX_IPSTRLEN] = {'\0'}; if (ipbuf[0] == '\0') conn->local.toHostStr(ipbuf, MAX_IPSTRLEN); static MemBuf buf; buf.reset(); buf.Printf(" FD %d, %s",conn->fd, ipbuf); const char *jobStatus = AsyncJob::status(); @@ -165,42 +171,57 @@ Comm::TcpAcceptor::setListen() } if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) { #ifdef SO_ACCEPTFILTER struct accept_filter_arg afa; bzero(&afa, sizeof(afa)); debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on " << conn); xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); if (setsockopt(conn->fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); #elif defined(TCP_DEFER_ACCEPT) int seconds = 30; if (strncmp(Config.accept_filter, "data=", 5) == 0) seconds = atoi(Config.accept_filter + 5); if (setsockopt(conn->fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); #else debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS"); #endif } + + typedef CommCbMemFunT Dialer; + closer_ = JobCallback(5, 4, Dialer, this, Comm::TcpAcceptor::handleClosure); + comm_add_close_handler(conn->fd, closer_); } +/// called when listening descriptor is closed by an external force +/// such as clientHttpConnectionsClose() +void +Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &io) +{ + closer_ = NULL; + conn = NULL; + Must(done()); +} + + /** * This private callback is called whenever a filedescriptor is ready * to dupe itself and fob off an accept()ed connection * * It will either do that accept operation. Or if there are not enough FD * available to do the clone safely will push the listening FD into a list * of deferred operations. The list gets kicked and the dupe/accept() actually * done later when enough sockets become available. */ void Comm::TcpAcceptor::doAccept(int fd, void *data) { try { debugs(5, 2, HERE << "New connection on FD " << fd); Must(isOpen(fd)); TcpAcceptor *afd = static_cast(data); if (!okToAccept()) { AcceptLimiter::Instance().defer(afd); === modified file 'src/comm/TcpAcceptor.h' --- src/comm/TcpAcceptor.h 2013-10-25 00:13:46 +0000 +++ src/comm/TcpAcceptor.h 2014-04-24 20:12:30 +0000 @@ -1,29 +1,31 @@ #ifndef SQUID_COMM_TCPACCEPTOR_H #define SQUID_COMM_TCPACCEPTOR_H #include "base/AsyncJob.h" #include "base/CbcPointer.h" #include "base/Subscription.h" #include "comm/forward.h" #include "comm_err_t.h" +class CommCloseCbParams; + namespace Comm { class AcceptLimiter; /** * Listens on a Comm::Connection for new incoming connections and * emits an active Comm::Connection descriptor for the new client. * * Handles all event limiting required to quash inbound connection * floods within the global FD limits of available Squid_MaxFD and * client_ip_max_connections. * * Fills the emitted connection with all connection details able to * be looked up. Currently these are the local/remote IP:port details * and the listening socket transparent-mode flag. */ class TcpAcceptor : public AsyncJob { public: @@ -56,37 +58,41 @@ public: */ void acceptNext(); /// Call the subscribed callback handler with details about a new connection. void notify(const comm_err_t flag, const Comm::ConnectionPointer &details) const; /// errno code of the last accept() or listen() action if one occurred. int errcode; protected: friend class AcceptLimiter; int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue. private: Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events. /// conn being listened on for new connections /// Reserved for read-only use. ConnectionPointer conn; + AsyncCall::Pointer closer_; ///< listen socket closure handler + /// Method to test if there are enough file descriptors to open a new client connection /// if not the accept() will be postponed static bool okToAccept(); /// Method callback for whenever an FD is ready to accept a client connection. static void doAccept(int fd, void *data); void acceptOne(); comm_err_t oldAccept(Comm::ConnectionPointer &details); void setListen(); + void handleClosure(const CommCloseCbParams &io); + CBDATA_CLASS2(TcpAcceptor); }; } // namespace Comm #endif /* SQUID_COMM_TCPACCEPTOR_H */