Problem description
--------------------

FTP client gets stuck after the following chain of events:

 * Client requests a file that will be blocked by ICAP.

* Squid starts downloading the file from the FTP server and sends "150 Opening..." to the FTP client.

* Squid aborts the data connection with the FTP server as soon as the ICAP service blocks it.

 * Squid sends "451 Forbidden" to the FTP client.

 * The FTP server sends "500 OOPS: setsockopt: linger" to Squid.

 * Squid terminates the control connection to the FTP server.

* Squid establishes a new control connection to the FTP server but does not authenticate itself.

 * Further commands from the FTP client do not work any more.

The above and many similar problems exist because Squid handles FTP client-to-squid and squid-to-FTP server data connections independently from each other. In many cases, one connection does not get notified about the problems with the other connection.

Tech details
------------

This patch:
- Add Ftp::MasterState::userDataDone to record received the FTP client final response status code to sent (or to be send) to the client.

- The Ftp::MasterState::waitForOriginData flag to hold status of the squid-to-server side. If the squid-to-server side is not finishes yet this is true.

- Send a control reply to the FTP client only after the data transferred on both server and client sides.

- Split Client::abortTransaction to Client::abortOnData and to Client::abortAll()

- Implement the Ftp::Relay::abortOnData() and Ftp::Relay::Abort() (i.e., StoreEntry abort handler) to avoid closing the control connection when the data connection is closed unexpectedly.

This is a Measurement Factory project.
Invalid FTP connection handling on blocked content.

FTP client gets stuck after the following chain of events:
 * Client requests a file that will be blocked by ICAP.
 * Squid starts downloading the file from the FTP server
   and sends "150 Opening..." to the FTP client.
 * Squid aborts the data connection with the FTP server
   as soon as the ICAP service blocks it.
 * Squid sends "451 Forbidden" to the FTP client.
 * The FTP server sends "500 OOPS: setsockopt: linger" to Squid.
 * Squid terminates the control connection to the FTP server.
 * Squid establishes a new control connection to the FTP server
   but does not authenticate itself.
 * Further commands from the FTP client do not work any more.

The above and many similar problems exist because Squid handles
FTP client-to-squid and squid-to-FTP server data connections
independently from each other. In many cases, one connection does
not get notified about the problems with the other connection.

This patch:
  - Add Ftp::MasterState::userDataDone to record received
    the FTP client final response status code to sent (or to be send)
    to the client.
  - The Ftp::MasterState::waitForOriginData flag to hold status of the
    squid-to-server side. If the squid-to-server side is not finishes
    yet this is true.
  - Send a control reply to the FTP client only after the data transfered
    on both server and client sides.
  - Split Client::abortTransaction to Client::abortOnData and to
    Client::abortAll()
  - Implement the Ftp::Relay::abortOnData() and Ftp::Relay::Abort()
    (i.e., StoreEntry abort handler) to avoid closing the control
    connection when the data connection is closed unexpectedly.

This is a Measurement Factory project.

=== modified file 'src/FwdState.cc'
--- src/FwdState.cc	2015-11-30 10:53:23 +0000
+++ src/FwdState.cc	2015-12-11 14:37:53 +0000
@@ -144,41 +144,44 @@
     e->lock("FwdState");
     EBIT_SET(e->flags, ENTRY_FWD_HDR_WAIT);
     flags.connected_okay = false;
     flags.dont_retry = false;
     flags.forward_completed = false;
     debugs(17, 3, "FwdState constructed, this=" << this);
 }
 
 // Called once, right after object creation, when it is safe to set self
 void FwdState::start(Pointer aSelf)
 {
     // Protect ourselves from being destroyed when the only Server pointing
     // to us is gone (while we expect to talk to more Servers later).
     // Once we set self, we are responsible for clearing it when we do not
     // expect to talk to any servers.
     self = aSelf; // refcounted
 
     // We hope that either the store entry aborts or peer is selected.
     // Otherwise we are going to leak our object.
 
-    entry->registerAbort(FwdState::abort, this);
+    // Ftp::Relay needs to preserve control connection on data aborts
+    // so it registers its own abort handler that calls ours when needed.
+    if (!request->flags.ftpNative)
+        entry->registerAbort(FwdState::abort, this);
 
 #if STRICT_ORIGINAL_DST
     // Bug 3243: CVE 2009-0801
     // Bypass of browser same-origin access control in intercepted communication
     // To resolve this we must force DIRECT and only to the original client destination.
     const bool isIntercepted = request && !request->flags.redirected && (request->flags.intercepted || request->flags.interceptTproxy);
     const bool useOriginalDst = Config.onoff.client_dst_passthru || (request && !request->flags.hostVerified);
     if (isIntercepted && useOriginalDst) {
         selectPeerForIntercepted();
         // 3.2 does not suppro re-wrapping inside CONNECT.
         // our only alternative is to fake destination "found" and continue with the forwarding.
         startConnectionOrFail();
         return;
     }
 #endif
 
     // do full route options selection
     peerSelect(&serverDestinations, request, al, entry, fwdPeerSelectionCompleteWrapper, this);
 }
 

=== modified file 'src/clients/Client.cc'
--- src/clients/Client.cc	2015-08-30 00:26:47 +0000
+++ src/clients/Client.cc	2015-10-27 10:04:40 +0000
@@ -230,41 +230,41 @@
     if (requestBodySource->setConsumerIfNotLate(this)) {
         debugs(11,3, HERE << "expecting request body from " <<
                requestBodySource->status());
         return true;
     }
 
     debugs(11,3, HERE << "aborting on partially consumed request body: " <<
            requestBodySource->status());
     requestBodySource = NULL;
     return false;
 }
 
 // Entry-dependent callbacks use this check to quit if the entry went bad
 bool
 Client::abortOnBadEntry(const char *abortReason)
 {
     if (entry->isAccepting())
         return false;
 
     debugs(11,5, HERE << "entry is not Accepting!");
-    abortTransaction(abortReason);
+    abortOnData(abortReason);
     return true;
 }
 
 // more request or adapted response body is available
 void
 Client::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
 {
 #if USE_ADAPTATION
     if (adaptedBodySource == bp) {
         handleMoreAdaptedBodyAvailable();
         return;
     }
 #endif
     if (requestBodySource == bp)
         handleMoreRequestBodyAvailable();
 }
 
 // the entire request or adapted response body was provided, successfully
 void
 Client::noteBodyProductionEnded(BodyPipe::Pointer bp)
@@ -276,40 +276,47 @@
     }
 #endif
     if (requestBodySource == bp)
         handleRequestBodyProductionEnded();
 }
 
 // premature end of the request or adapted response body production
 void
 Client::noteBodyProducerAborted(BodyPipe::Pointer bp)
 {
 #if USE_ADAPTATION
     if (adaptedBodySource == bp) {
         handleAdaptedBodyProducerAborted();
         return;
     }
 #endif
     if (requestBodySource == bp)
         handleRequestBodyProducerAborted();
 }
 
+bool
+Client::abortOnData(const char *reason)
+{
+    abortAll(reason);
+    return true;
+}
+
 // more origin request body data is available
 void
 Client::handleMoreRequestBodyAvailable()
 {
     if (!requestSender)
         sendMoreRequestBody();
     else
         debugs(9,3, HERE << "waiting for request body write to complete");
 }
 
 // there will be no more handleMoreRequestBodyAvailable calls
 void
 Client::handleRequestBodyProductionEnded()
 {
     receivedWholeRequestBody = true;
     if (!requestSender)
         doneSendingRequestBody();
     else
         debugs(9,3, HERE << "waiting for request body write to complete");
 }
@@ -350,46 +357,46 @@
     if (io.size > 0) {
         fd_bytes(io.fd, io.size, FD_WRITE);
         statCounter.server.all.kbytes_out += io.size;
         // kids should increment their counters
     }
 
     if (io.flag == Comm::ERR_CLOSING)
         return;
 
     if (!requestBodySource) {
         debugs(9,3, HERE << "detected while-we-were-sending abort");
         return; // do nothing;
     }
 
     if (io.flag) {
         debugs(11, DBG_IMPORTANT, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
         ErrorState *err;
         err = new ErrorState(ERR_WRITE_ERROR, Http::scBadGateway, fwd->request);
         err->xerrno = io.xerrno;
         fwd->fail(err);
-        abortTransaction("I/O error while sending request body");
+        abortOnData("I/O error while sending request body");
         return;
     }
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
-        abortTransaction("store entry aborted while sending request body");
+        abortOnData("store entry aborted while sending request body");
         return;
     }
 
     if (!requestBodySource->exhausted())
         sendMoreRequestBody();
     else if (receivedWholeRequestBody)
         doneSendingRequestBody();
     else
         debugs(9,3, HERE << "waiting for body production end or abort");
 }
 
 void
 Client::sendMoreRequestBody()
 {
     assert(requestBodySource != NULL);
     assert(!requestSender);
 
     const Comm::ConnectionPointer conn = dataConnection();
 
     if (!Comm::IsConnOpen(conn)) {
@@ -830,130 +837,130 @@
     if (mayReadVirginReplyBody()) {
         debugs(11,3, HERE << "closing origin conn due to ICAP completion");
         closeServer();
     }
 
     completeForwarding();
 }
 
 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
 void
 Client::handleAdaptationAborted(bool bypassable)
 {
     debugs(11,5, HERE << "handleAdaptationAborted; bypassable: " << bypassable <<
            ", entry empty: " << entry->isEmpty());
 
     if (abortOnBadEntry("entry went bad while ICAP aborted"))
         return;
 
     // TODO: bypass if possible
     if (!handledEarlyAdaptationAbort())
-        abortTransaction("adaptation failure with a filled entry");
+        abortAll("adaptation failure with a filled entry");
 }
 
 /// If the store entry is still empty, fully handles adaptation abort, returning
 /// true. Otherwise just updates the request error detail and returns false.
 bool
 Client::handledEarlyAdaptationAbort()
 {
     if (entry->isEmpty()) {
         debugs(11,8, "adaptation failure with an empty entry: " << *entry);
         ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, Http::scInternalServerError, request);
         err->detailError(ERR_DETAIL_ICAP_RESPMOD_EARLY);
         fwd->fail(err);
         fwd->dontRetry(true);
-        abortTransaction("adaptation failure with an empty entry");
+        abortAll("adaptation failure with an empty entry");
         return true; // handled
     }
 
     if (request) // update logged info directly
         request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_ICAP_RESPMOD_LATE);
 
     return false; // the caller must handle
 }
 
 // adaptation service wants us to deny HTTP client access to this response
 void
 Client::handleAdaptationBlocked(const Adaptation::Answer &answer)
 {
     debugs(11,5, HERE << answer.ruleId);
 
     if (abortOnBadEntry("entry went bad while ICAP aborted"))
         return;
 
     if (!entry->isEmpty()) { // too late to block (should not really happen)
         if (request)
             request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_RESPMOD_BLOCK_LATE);
-        abortTransaction("late adaptation block");
+        abortAll("late adaptation block");
         return;
     }
 
     debugs(11,7, HERE << "creating adaptation block response");
 
     err_type page_id =
         aclGetDenyInfoPage(&Config.denyInfoList, answer.ruleId.termedBuf(), 1);
     if (page_id == ERR_NONE)
         page_id = ERR_ACCESS_DENIED;
 
     ErrorState *err = new ErrorState(page_id, Http::scForbidden, request);
     err->detailError(ERR_DETAIL_RESPMOD_BLOCK_EARLY);
     fwd->fail(err);
     fwd->dontRetry(true);
 
-    abortTransaction("timely adaptation block");
+    abortOnData("timely adaptation block");
 }
 
 void
 Client::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
 {
     adaptationAccessCheckPending = false;
 
     if (abortOnBadEntry("entry went bad while waiting for ICAP ACL check"))
         return;
 
     // TODO: Should non-ICAP and ICAP REPMOD pre-cache paths check this?
     // That check now only happens on REQMOD pre-cache and REPMOD post-cache, in processReplyAccess().
     if (virginReply()->expectedBodyTooLarge(*request)) {
         sendBodyIsTooLargeError();
         return;
     }
     // TODO: Should we check receivedBodyTooLarge as well?
 
     if (!group) {
         debugs(11,3, HERE << "no adapation needed");
         setFinalReply(virginReply());
         processReplyBody();
         return;
     }
 
     startAdaptation(group, originalRequest());
     processReplyBody();
 }
 #endif
 
 void
 Client::sendBodyIsTooLargeError()
 {
     ErrorState *err = new ErrorState(ERR_TOO_BIG, Http::scForbidden, request);
     fwd->fail(err);
     fwd->dontRetry(true);
-    abortTransaction("Virgin body too large.");
+    abortOnData("Virgin body too large.");
 }
 
 // TODO: when HttpStateData sends all errors to ICAP,
 // we should be able to move this at the end of setVirginReply().
 void
 Client::adaptOrFinalizeReply()
 {
 #if USE_ADAPTATION
     // TODO: merge with client side and return void to hide the on/off logic?
     // The callback can be called with a NULL service if adaptation is off.
     adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
                                        Adaptation::methodRespmod, Adaptation::pointPreCache,
                                        originalRequest(), virginReply(), fwd->al, this);
     debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
     if (adaptationAccessCheckPending)
         return;
 #endif
 
     setFinalReply(virginReply());
 }

=== modified file 'src/clients/Client.h'
--- src/clients/Client.h	2015-08-24 21:07:31 +0000
+++ src/clients/Client.h	2015-10-27 10:04:26 +0000
@@ -37,41 +37,46 @@
 {
 
 public:
     Client(FwdState *);
     virtual ~Client();
 
     /// \return primary or "request data connection"
     virtual const Comm::ConnectionPointer & dataConnection() const = 0;
 
     // BodyConsumer: consume request body or adapted response body.
     // The implementation just calls the corresponding HTTP or ICAP handle*()
     // method, depending on the pipe.
     virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer);
     virtual void noteBodyProductionEnded(BodyPipe::Pointer);
     virtual void noteBodyProducerAborted(BodyPipe::Pointer);
 
     /// read response data from the network
     virtual void maybeReadVirginBody() = 0;
 
     /// abnormal transaction termination; reason is for debugging only
-    virtual void abortTransaction(const char *reason) = 0;
+    virtual void abortAll(const char *reason) = 0;
+
+    /// abnormal data transfer termination
+    /// \retval true the transaction will be terminated (abortAll called)
+    /// \reval false the transaction will survive
+    virtual bool abortOnData(const char *reason);
 
     /// a hack to reach HttpStateData::orignal_request
     virtual  HttpRequest *originalRequest();
 
 #if USE_ADAPTATION
     // Adaptation::Initiator API: start an ICAP transaction and receive adapted headers.
     virtual void noteAdaptationAnswer(const Adaptation::Answer &answer);
     virtual void noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group);
 
     // BodyProducer: provide virgin response body to ICAP.
     virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer );
     virtual void noteBodyConsumerAborted(BodyPipe::Pointer );
 #endif
     virtual bool getMoreRequestBody(MemBuf &buf);
     virtual void processReplyBody() = 0;
 
 //AsyncJob virtual methods
     virtual void swanSong();
     virtual bool doneAll() const;
 

=== modified file 'src/clients/FtpClient.cc'
--- src/clients/FtpClient.cc	2015-10-13 12:53:10 +0000
+++ src/clients/FtpClient.cc	2015-10-30 19:55:48 +0000
@@ -333,72 +333,72 @@
         typedef CommCbMemFunT<Client, CommIoCbParams> Dialer;
         AsyncCall::Pointer reader = JobCallback(9, 5, Dialer, this, Ftp::Client::readControlReply);
         comm_read(ctrl.conn, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, reader);
     }
 }
 
 void
 Ftp::Client::readControlReply(const CommIoCbParams &io)
 {
     debugs(9, 3, "FD " << io.fd << ", Read " << io.size << " bytes");
 
     if (io.size > 0) {
         statCounter.server.all.kbytes_in += io.size;
         statCounter.server.ftp.kbytes_in += io.size;
     }
 
     if (io.flag == Comm::ERR_CLOSING)
         return;
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
-        abortTransaction("entry aborted during control reply read");
-        return;
+        if (abortOnData("entry aborted during control reply read"))
+            return;
     }
 
     assert(ctrl.offset < ctrl.size);
 
     if (io.flag == Comm::OK && io.size > 0) {
         fd_bytes(io.fd, io.size, FD_READ);
     }
 
     if (io.flag != Comm::OK) {
         debugs(50, ignoreErrno(io.xerrno) ? 3 : DBG_IMPORTANT,
                "FTP control reply read error: " << xstrerr(io.xerrno));
 
         if (ignoreErrno(io.xerrno)) {
             scheduleReadControlReply(0);
         } else {
             failed(ERR_READ_ERROR, io.xerrno);
             /* failed closes ctrl.conn and frees ftpState */
         }
         return;
     }
 
     if (io.size == 0) {
         if (entry->store_status == STORE_PENDING) {
             failed(ERR_FTP_FAILURE, 0);
             /* failed closes ctrl.conn and frees ftpState */
             return;
         }
 
         /* XXX this may end up having to be serverComplete() .. */
-        abortTransaction("zero control reply read");
+        abortAll("zero control reply read");
         return;
     }
 
     unsigned int len =io.size + ctrl.offset;
     ctrl.offset = len;
     assert(len <= ctrl.size);
     if (Comm::IsConnOpen(ctrl.conn))
         commUnsetConnTimeout(ctrl.conn); // we are done waiting for ctrl reply
     handleControlReply();
 }
 
 void
 Ftp::Client::handleControlReply()
 {
     debugs(9, 3, status());
 
     size_t bytes_used = 0;
     wordlistDestroy(&ctrl.message);
 
     if (!parseControlReply(bytes_used)) {
@@ -898,41 +898,41 @@
 Ftp::Client::dataRead(const CommIoCbParams &io)
 {
     int j;
     int bin;
 
     data.read_pending = false;
 
     debugs(9, 3, "FD " << io.fd << " Read " << io.size << " bytes");
 
     if (io.size > 0) {
         statCounter.server.all.kbytes_in += io.size;
         statCounter.server.ftp.kbytes_in += io.size;
     }
 
     if (io.flag == Comm::ERR_CLOSING)
         return;
 
     assert(io.fd == data.conn->fd);
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
-        abortTransaction("entry aborted during dataRead");
+        abortOnData("entry aborted during dataRead");
         return;
     }
 
     if (io.flag == Comm::OK && io.size > 0) {
         debugs(9, 5, "appended " << io.size << " bytes to readBuf");
         data.readBuf->appended(io.size);
 #if USE_DELAY_POOLS
         DelayId delayId = entry->mem_obj->mostBytesAllowed();
         delayId.bytesIn(io.size);
 #endif
         ++ IOStats.Ftp.reads;
 
         for (j = io.size - 1, bin = 0; j; ++bin)
             j >>= 1;
 
         ++ IOStats.Ftp.read_hist[bin];
     }
 
     if (io.flag != Comm::OK) {
         debugs(50, ignoreErrno(io.xerrno) ? 3 : DBG_IMPORTANT,
@@ -982,41 +982,41 @@
      * status code after the data command.  FtpStateData was being
      * deleted in the middle of dataRead().
      */
     /* AYJ: 2011-01-13: Bug 2581.
      * 226 status is possibly waiting in the ctrl buffer.
      * The connection will hang if we DONT send buffered_ok.
      * This happens on all transfers which can be completly sent by the
      * server before the 150 started status message is read in by Squid.
      * ie all transfers of about one packet hang.
      */
     scheduleReadControlReply(1);
 }
 
 /**
  * Quickly abort the transaction
  *
  \todo destruction should be sufficient as the destructor should cleanup,
  * including canceling close handlers
  */
 void
-Ftp::Client::abortTransaction(const char *reason)
+Ftp::Client::abortAll(const char *reason)
 {
     debugs(9, 3, "aborting transaction for " << reason <<
            "; FD " << (ctrl.conn!=NULL?ctrl.conn->fd:-1) << ", Data FD " << (data.conn!=NULL?data.conn->fd:-1) << ", this " << this);
     if (Comm::IsConnOpen(ctrl.conn)) {
         ctrl.conn->close();
         return;
     }
 
     fwd->handleUnregisteredServerEnd();
     mustStop("Ftp::Client::abortTransaction");
 }
 
 /**
  * Cancel the timeout on the Control socket and establish one
  * on the data socket
  */
 void
 Ftp::Client::switchTimeoutToDataChannel()
 {
     commUnsetConnTimeout(ctrl.conn);

=== modified file 'src/clients/FtpClient.h'
--- src/clients/FtpClient.h	2015-10-13 12:53:10 +0000
+++ src/clients/FtpClient.h	2015-10-27 09:43:49 +0000
@@ -150,41 +150,41 @@
         SENT_FEAT,
         SENT_PWD,
         SENT_CDUP,
         SENT_DATA_REQUEST, // LIST, NLST or RETR requests..
         SENT_COMMAND, // General command
         END
     } ftp_state_t;
 
     int state;
     char *old_request;
     char *old_reply;
 
 protected:
     /* AsyncJob API */
     virtual void start();
 
     /* Client API */
     virtual void closeServer();
     virtual bool doneWithServer() const;
     virtual const Comm::ConnectionPointer & dataConnection() const;
-    virtual void abortTransaction(const char *reason);
+    virtual void abortAll(const char *reason);
 
     virtual Http::StatusCode failedHttpStatus(err_type &error);
     void ctrlClosed(const CommCloseCbParams &io);
     void scheduleReadControlReply(int buffered_ok);
     void readControlReply(const CommIoCbParams &io);
     virtual void handleControlReply();
     void writeCommandCallback(const CommIoCbParams &io);
     virtual void dataChannelConnected(const CommConnectCbParams &io) = 0;
     void dataRead(const CommIoCbParams &io);
     void dataComplete();
     AsyncCall::Pointer dataCloser();
     virtual void dataClosed(const CommCloseCbParams &io);
     void initReadBuf();
 
     // sending of the request body to the server
     virtual void sentRequestBody(const CommIoCbParams &io);
     virtual void doneSendingRequestBody();
 
 private:
     bool parseControlReply(size_t &bytesUsed);

=== modified file 'src/clients/FtpGateway.cc'
--- src/clients/FtpGateway.cc	2015-10-13 12:53:10 +0000
+++ src/clients/FtpGateway.cc	2015-10-27 10:28:11 +0000
@@ -972,41 +972,41 @@
 
 void
 Ftp::Gateway::processReplyBody()
 {
     debugs(9, 3, status());
 
     if (request->method == Http::METHOD_HEAD && (flags.isdir || theSize != -1)) {
         serverComplete();
         return;
     }
 
     /* Directory listings are special. They write ther own headers via the error objects */
     if (!flags.http_header_sent && data.readBuf->contentSize() >= 0 && !flags.isdir)
         appendSuccessHeader();
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
         /*
          * probably was aborted because content length exceeds one
          * of the maximum size limits.
          */
-        abortTransaction("entry aborted after calling appendSuccessHeader()");
+        abortAll("entry aborted after calling appendSuccessHeader()");
         return;
     }
 
 #if USE_ADAPTATION
 
     if (adaptationAccessCheckPending) {
         debugs(9, 3, "returning from Ftp::Gateway::processReplyBody due to adaptationAccessCheckPending");
         return;
     }
 
 #endif
 
     if (flags.isdir) {
         if (!flags.listing) {
             flags.listing = 1;
             listing.reset();
         }
         parseListing();
         maybeReadVirginBody();
         return;
@@ -1680,41 +1680,41 @@
     if (ftpState->sendPassive()) {
         // SENT_EPSV_ALL blocks other non-EPSV connections being attempted
         if (ftpState->state == Ftp::Client::SENT_EPSV_ALL)
             ftpState->flags.epsv_all_sent = true;
     }
 }
 
 void
 Ftp::Gateway::processHeadResponse()
 {
     debugs(9, 5, HERE << "handling HEAD response");
     ftpSendQuit(this);
     appendSuccessHeader();
 
     /*
      * On rare occasions I'm seeing the entry get aborted after
      * readControlReply() and before here, probably when
      * trying to write to the client.
      */
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
-        abortTransaction("entry aborted while processing HEAD");
+        abortAll("entry aborted while processing HEAD");
         return;
     }
 
 #if USE_ADAPTATION
     if (adaptationAccessCheckPending) {
         debugs(9,3, HERE << "returning due to adaptationAccessCheckPending");
         return;
     }
 #endif
 
     // processReplyBody calls serverComplete() since there is no body
     processReplyBody();
 }
 
 static void
 ftpReadPasv(Ftp::Gateway * ftpState)
 {
     Ip::Address srvAddr; // unused
     if (ftpState->handlePasvReply(srvAddr))
         ftpState->connectDataChannel();
@@ -1897,41 +1897,41 @@
     if (code != 200) {
         /* Failover to attempting old PORT command. */
         debugs(9, 3, "EPRT not supported by remote end");
         ftpSendPORT(ftpState);
         return;
     }
 
     ftpRestOrList(ftpState);
 }
 
 /** "read" handler to accept FTP data connections.
  *
  \param io    comm accept(2) callback parameters
  */
 void
 Ftp::Gateway::ftpAcceptDataConnection(const CommAcceptCbParams &io)
 {
     debugs(9, 3, HERE);
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
-        abortTransaction("entry aborted when accepting data conn");
+        abortAll("entry aborted when accepting data conn");
         data.listenConn->close();
         data.listenConn = NULL;
         return;
     }
 
     if (io.flag != Comm::OK) {
         data.listenConn->close();
         data.listenConn = NULL;
         debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: " << io.conn << ": " << xstrerr(io.xerrno));
         /** \todo Need to send error message on control channel*/
         ftpFail(this);
         return;
     }
 
     /* data listening conn is no longer even open. abort. */
     if (!Comm::IsConnOpen(data.listenConn)) {
         data.listenConn = NULL; // ensure that it's cleared and not just closed.
         return;
     }
 

=== modified file 'src/clients/FtpRelay.cc'
--- src/clients/FtpRelay.cc	2015-08-04 19:57:07 +0000
+++ src/clients/FtpRelay.cc	2015-12-29 15:57:13 +0000
@@ -37,74 +37,78 @@
     explicit Relay(FwdState *const fwdState);
     virtual ~Relay();
 
 protected:
     const Ftp::MasterState &master() const;
     Ftp::MasterState &updateMaster();
     Ftp::ServerState serverState() const { return master().serverState; }
     void serverState(const Ftp::ServerState newState);
 
     /* Ftp::Client API */
     virtual void failed(err_type error = ERR_NONE, int xerrno = 0);
     virtual void dataChannelConnected(const CommConnectCbParams &io);
 
     /* Client API */
     virtual void serverComplete();
     virtual void handleControlReply();
     virtual void processReplyBody();
     virtual void handleRequestBodyProducerAborted();
     virtual bool mayReadVirginReplyBody() const;
     virtual void completeForwarding();
+    virtual bool abortOnData(const char *reason);
 
     /* AsyncJob API */
     virtual void start();
 
     void forwardReply();
     void forwardError(err_type error = ERR_NONE, int xerrno = 0);
     void failedErrorMessage(err_type error, int xerrno);
     HttpReply *createHttpReply(const Http::StatusCode httpStatus, const int64_t clen = 0);
     void handleDataRequest();
     void startDataDownload();
     void startDataUpload();
     bool startDirTracking();
     void stopDirTracking();
     bool weAreTrackingDir() const {return savedReply.message != NULL;}
 
     typedef void (Relay::*PreliminaryCb)();
     void forwardPreliminaryReply(const PreliminaryCb cb);
     void proceedAfterPreliminaryReply();
     PreliminaryCb thePreliminaryCb;
 
     typedef void (Relay::*SM_FUNC)();
     static const SM_FUNC SM_FUNCS[];
     void readGreeting();
     void sendCommand();
     void readReply();
     void readFeatReply();
     void readPasvReply();
     void readDataReply();
     void readTransferDoneReply();
     void readEpsvReply();
     void readCwdOrCdupReply();
     void readUserOrPassReply();
 
     void scheduleReadControlReply();
+    void finalizeDataDownload();
+
+    static void abort(void *d); // TODO: Capitalize this and FwdState::abort().
 
     bool forwardingCompleted; ///< completeForwarding() has been called
 
     struct {
         wordlist *message; ///< reply message, one  wordlist entry per message line
         char *lastCommand; ///< the command caused the reply
         char *lastReply; ///< last line of reply: reply status plus message
         int replyCode; ///< the reply status
     } savedReply; ///< set and delayed while we are tracking using PWD
 };
 
 } // namespace Ftp
 
 CBDATA_NAMESPACED_CLASS_INIT(Ftp, Relay);
 
 const Ftp::Relay::SM_FUNC Ftp::Relay::SM_FUNCS[] = {
     &Ftp::Relay::readGreeting, // BEGIN
     &Ftp::Relay::readUserOrPassReply, // SENT_USER
     &Ftp::Relay::readUserOrPassReply, // SENT_PASS
     NULL,/* &Ftp::Relay::readReply */ // SENT_TYPE
@@ -131,40 +135,42 @@
     &Ftp::Relay::readCwdOrCdupReply, // SENT_CDUP
     &Ftp::Relay::readDataReply,// SENT_DATA_REQUEST
     &Ftp::Relay::readReply, // SENT_COMMAND
     NULL
 };
 
 Ftp::Relay::Relay(FwdState *const fwdState):
     AsyncJob("Ftp::Relay"),
     Ftp::Client(fwdState),
     thePreliminaryCb(NULL),
     forwardingCompleted(false)
 {
     savedReply.message = NULL;
     savedReply.lastCommand = NULL;
     savedReply.lastReply = NULL;
     savedReply.replyCode = 0;
 
     // Nothing we can do at request creation time can mark the response as
     // uncachable, unfortunately. This prevents "found KEY_PRIVATE" WARNINGs.
     entry->releaseRequest();
+    // TODO: Convert registerAbort() to use AsyncCall
+    entry->registerAbort(Ftp::Relay::abort, this);
 }
 
 Ftp::Relay::~Relay()
 {
     closeServer(); // TODO: move to clients/Client.cc?
     if (savedReply.message)
         wordlistDestroy(&savedReply.message);
 
     xfree(savedReply.lastCommand);
     xfree(savedReply.lastReply);
 }
 
 void
 Ftp::Relay::start()
 {
     if (!master().clientReadGreeting)
         Ftp::Client::start();
     else if (serverState() == fssHandleDataRequest ||
              serverState() == fssHandleUploadRequest)
         handleDataRequest();
@@ -264,41 +270,48 @@
 void
 Ftp::Relay::failedErrorMessage(err_type error, int xerrno)
 {
     const Http::StatusCode httpStatus = failedHttpStatus(error);
     HttpReply *const reply = createHttpReply(httpStatus);
     entry->replaceHttpReply(reply);
     EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
     fwd->request->detailError(error, xerrno);
 }
 
 void
 Ftp::Relay::processReplyBody()
 {
     debugs(9, 3, status());
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
         /*
          * probably was aborted because content length exceeds one
          * of the maximum size limits.
          */
-        abortTransaction("entry aborted after calling appendSuccessHeader()");
+        abortOnData("entry aborted after calling appendSuccessHeader()");
+        return;
+    }
+
+    if (master().userDataDone) {
+        // Squid-to-client data transfer done. Abort data transfer on our
+        // side to allow new commands from ftp client
+        abortOnData("Squid-to-client data connection is closed");
         return;
     }
 
 #if USE_ADAPTATION
 
     if (adaptationAccessCheckPending) {
         debugs(9, 3, "returning due to adaptationAccessCheckPending");
         return;
     }
 
 #endif
 
     if (data.readBuf != NULL && data.readBuf->hasContent()) {
         const mb_size_t csize = data.readBuf->contentSize();
         debugs(9, 5, "writing " << csize << " bytes to the reply");
         addVirginReplyBody(data.readBuf->content(), csize);
         data.readBuf->consume(csize);
     }
 
     entry->flush();
@@ -457,41 +470,41 @@
         // has greeted the user already. Also, an original origin greeting may
         // confuse a user that has changed the origin mid-air.
 
         start();
         break;
     case 120:
         if (NULL != ctrl.message)
             debugs(9, DBG_IMPORTANT, "FTP server is busy: " << ctrl.message->key);
         forwardPreliminaryReply(&Ftp::Relay::scheduleReadControlReply);
         break;
     default:
         failed();
         break;
     }
 }
 
 void
 Ftp::Relay::sendCommand()
 {
     if (!fwd->request->header.has(Http::HdrType::FTP_COMMAND)) {
-        abortTransaction("Internal error: FTP relay request with no command");
+        abortAll("Internal error: FTP relay request with no command");
         return;
     }
 
     HttpHeader &header = fwd->request->header;
     assert(header.has(Http::HdrType::FTP_COMMAND));
     const String &cmd = header.findEntry(Http::HdrType::FTP_COMMAND)->value;
     assert(header.has(Http::HdrType::FTP_ARGUMENTS));
     const String &params = header.findEntry(Http::HdrType::FTP_ARGUMENTS)->value;
 
     if (params.size() > 0)
         debugs(9, 5, "command: " << cmd << ", parameters: " << params);
     else
         debugs(9, 5, "command: " << cmd << ", no parameters");
 
     if (serverState() == fssHandlePasv ||
             serverState() == fssHandleEpsv ||
             serverState() == fssHandleEprt ||
             serverState() == fssHandlePort) {
         sendPassive();
         return;
@@ -655,54 +668,103 @@
     if (weAreTrackingDir()) { // we are tracking
         stopDirTracking(); // and forward the delayed response below
     } else if (ctrl.replycode == 230) { // successful login
         if (startDirTracking())
             return;
     }
 
     forwardReply();
 }
 
 void
 Ftp::Relay::readTransferDoneReply()
 {
     debugs(9, 3, status());
 
     if (ctrl.replycode != 226 && ctrl.replycode != 250) {
         debugs(9, DBG_IMPORTANT, "got FTP code " << ctrl.replycode <<
                " after reading response data");
     }
 
-    serverComplete();
+    finalizeDataDownload();
 }
 
 void
 Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io)
 {
     debugs(9, 3, status());
     data.opener = NULL;
 
     if (io.flag != Comm::OK) {
         debugs(9, 2, "failed to connect FTP server data channel");
         forwardError(ERR_CONNECT_FAIL, io.xerrno);
         return;
     }
 
     debugs(9, 2, "connected FTP server data channel: " << io.conn);
 
     data.opened(io.conn, dataCloser());
 
     sendCommand();
 }
 
 void
 Ftp::Relay::scheduleReadControlReply()
 {
     Ftp::Client::scheduleReadControlReply(0);
 }
 
+void
+Ftp::Relay::finalizeDataDownload()
+{
+    debugs(9, 2, "Complete data downloading/Uploading");
+
+    updateMaster().waitForOriginData = false;
+
+    CbcPointer<ConnStateData> &mgr = fwd->request->clientConnectionManager;
+    if (mgr.valid()) {
+        if (Ftp::Server *srv = dynamic_cast<Ftp::Server*>(mgr.get())) {
+            typedef NullaryMemFunT<Ftp::Server> CbDialer;
+            AsyncCall::Pointer call = JobCallback(11, 3, CbDialer, srv,
+                                                        Ftp::Server::originDataCompletionCheckpoint);
+            ScheduleCallHere(call);
+        }
+    }
+    serverComplete();
+}
+
+bool
+Ftp::Relay::abortOnData(const char *reason)
+{
+    debugs(9, 3, "aborting transaction for " << reason <<
+           "; FD " << (ctrl.conn != NULL ? ctrl.conn->fd : -1) << ", Data FD " << (data.conn != NULL ? data.conn->fd : -1) << ", this " << this);
+    // this method is only called to handle data connection problems
+    // the control connection should keep going
+
+#if USE_ADAPTATION
+    if (adaptedBodySource != NULL)
+        stopConsumingFrom(adaptedBodySource);
+#endif
+
+    if (Comm::IsConnOpen(data.conn))
+        dataComplete();
+
+    return !Comm::IsConnOpen(ctrl.conn);
+}
+
+void
+Ftp::Relay::abort(void *d)
+{
+    Ftp::Relay *ftpClient = (Ftp::Relay *)d;
+    debugs(9, 2, "Client Data connection closed!");
+    if (!cbdataReferenceValid(ftpClient))
+        return;
+    if (Comm::IsConnOpen(ftpClient->data.conn))
+        ftpClient->dataComplete();
+}
+
 AsyncJob::Pointer
 Ftp::StartRelay(FwdState *const fwdState)
 {
     return AsyncJob::Start(new Ftp::Relay(fwdState));
 }
 

=== modified file 'src/http.cc'
--- src/http.cc	2015-10-15 02:52:58 +0000
+++ src/http.cc	2015-10-27 09:34:27 +0000
@@ -2442,34 +2442,34 @@
         fwd->fail(err);
     }
 
     abortTransaction("request body producer aborted");
 }
 
 // called when we wrote request headers(!) or a part of the body
 void
 HttpStateData::sentRequestBody(const CommIoCbParams &io)
 {
     if (io.size > 0)
         statCounter.server.http.kbytes_out += io.size;
 
     Client::sentRequestBody(io);
 }
 
 // Quickly abort the transaction
 // TODO: destruction should be sufficient as the destructor should cleanup,
 // including canceling close handlers
 void
-HttpStateData::abortTransaction(const char *reason)
+HttpStateData::abortAll(const char *reason)
 {
     debugs(11,5, HERE << "aborting transaction for " << reason <<
            "; " << serverConnection << ", this " << this);
 
     if (Comm::IsConnOpen(serverConnection)) {
         serverConnection->close();
         return;
     }
 
     fwd->handleUnregisteredServerEnd();
-    mustStop("HttpStateData::abortTransaction");
+    mustStop("HttpStateData::abortAll");
 }
 

=== modified file 'src/http.h'
--- src/http.h	2015-06-01 21:41:37 +0000
+++ src/http.h	2015-10-27 09:32:41 +0000
@@ -67,43 +67,45 @@
     Comm::ConnectionPointer serverConnection;
     AsyncCall::Pointer closeHandler;
     enum ConnectionStatus {
         INCOMPLETE_MSG,
         COMPLETE_PERSISTENT_MSG,
         COMPLETE_NONPERSISTENT_MSG
     };
     ConnectionStatus statusIfComplete() const;
     ConnectionStatus persistentConnStatus() const;
     void keepaliveAccounting(HttpReply *);
     void checkDateSkew(HttpReply *);
 
     bool continueAfterParsingHeader();
     void truncateVirginBody();
 
     virtual void start();
     virtual void haveParsedReplyHeaders();
     virtual bool getMoreRequestBody(MemBuf &buf);
     virtual void closeServer(); // end communication with the server
     virtual bool doneWithServer() const; // did we end communication?
-    virtual void abortTransaction(const char *reason); // abnormal termination
+    virtual void abortAll(const char *reason); // abnormal termination
     virtual bool mayReadVirginReplyBody() const;
 
+    void abortTransaction(const char *reason) { abortAll(reason); } // abnormal termination
+
     /**
      * determine if read buffer can have space made available
      * for a read.
      *
      * \param grow  whether to actually expand the buffer
      *
      * \return whether the buffer can be grown to provide space
      *         regardless of whether the grow actually happened.
      */
     bool maybeMakeSpaceAvailable(bool grow);
 
     // consuming request body
     virtual void handleMoreRequestBodyAvailable();
     virtual void handleRequestBodyProducerAborted();
 
     void writeReplyBody();
     bool decodeAndWriteReplyBody();
     bool finishingBrokenPost();
     bool finishingChunkedRequest();
     void doneSendingRequestBody();

=== modified file 'src/servers/FtpServer.cc'
--- src/servers/FtpServer.cc	2015-11-30 14:23:16 +0000
+++ src/servers/FtpServer.cc	2015-12-29 12:52:56 +0000
@@ -290,45 +290,52 @@
     // find request
     ClientSocketContext::Pointer context = pipeline.front();
     Must(context != nullptr);
     ClientHttpRequest *const http = context->http;
     Must(http != NULL);
     HttpRequest *const request = http->request;
     Must(request != NULL);
 
     // this is not an idle connection, so we do not want I/O monitoring
     const bool monitor = false;
 
     // make FTP peer connection exclusive to our request
     pinConnection(conn, request, conn->getPeer(), false, monitor);
 }
 
 void
 Ftp::Server::clientPinnedConnectionClosed(const CommCloseCbParams &io)
 {
     ConnStateData::clientPinnedConnectionClosed(io);
 
-    // if the server control connection is gone, reset state to login again
-    resetLogin("control connection closure");
+    // TODO: Keep the control connection open after fixing the reset
+    // problem below
+    if (Comm::IsConnOpen(clientConnection))
+        clientConnection->close();
 
-    // XXX: Reseting is not enough. FtpRelay::sendCommand() will not re-login
-    // because FtpRelay::serverState() is not going to be fssConnected.
+    // TODO: If the server control connection is gone, reset state to login
+    // again. Reseting login alone is not enough: FtpRelay::sendCommand() will
+    // not re-login because FtpRelay::serverState() is not going to be 
+    // fssConnected. Calling resetLogin() alone is also harmful because
+    // it does not reset correctly the client-to-squid control connection (eg
+    // respond if required with an error code, in all cases)
+    // resetLogin("control connection closure");
 }
 
 /// clear client and server login-related state after the old login is gone
 void
 Ftp::Server::resetLogin(const char *reason)
 {
     debugs(33, 5, "will need to re-login due to " << reason);
     master->clientReadGreeting = false;
     changeState(fssBegin, reason);
 }
 
 /// computes uri member from host and, if tracked, working dir with file name
 void
 Ftp::Server::calcUri(const SBuf *file)
 {
     uri = "ftp://";;
     uri.append(host);
     if (port->ftp_track_dirs && master->workingDir.length()) {
         if (master->workingDir[0] != '/')
             uri.append("/", 1);
@@ -962,76 +969,73 @@
     MemBuf mb;
     mb.init(data.length + 1, data.length + 1);
     mb.append(data.data, data.length);
 
     typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteReplyData);
     Comm::Write(dataConn, &mb, call);
 
     pipeline.front()->noteSentBodyBytes(data.length);
 }
 
 /// called when we are done writing a chunk of the response data
 void
 Ftp::Server::wroteReplyData(const CommIoCbParams &io)
 {
     if (io.flag == Comm::ERR_CLOSING)
         return;
 
     if (io.flag != Comm::OK) {
         debugs(33, 3, "FTP reply data writing failed: " << xstrerr(io.xerrno));
-        closeDataConnection();
-        writeCustomReply(426, "Data connection error; transfer aborted");
+        userDataCompletionCheckpoint(426);
         return;
     }
 
     assert(pipeline.front()->http);
     pipeline.front()->http->out.size += io.size;
     replyDataWritingCheckpoint();
 }
 
 /// ClientStream checks after (actual or skipped) reply data writing
 void
 Ftp::Server::replyDataWritingCheckpoint()
 {
     switch (pipeline.front()->socketState()) {
     case STREAM_NONE:
         debugs(33, 3, "Keep going");
         pipeline.front()->pullData();
         return;
     case STREAM_COMPLETE:
         debugs(33, 3, "FTP reply data transfer successfully complete");
-        writeCustomReply(226, "Transfer complete");
+        userDataCompletionCheckpoint(226);
         break;
     case STREAM_UNPLANNED_COMPLETE:
         debugs(33, 3, "FTP reply data transfer failed: STREAM_UNPLANNED_COMPLETE");
-        writeCustomReply(451, "Server error; transfer aborted");
+        userDataCompletionCheckpoint(451);
         break;
     case STREAM_FAILED:
+        userDataCompletionCheckpoint(451);
         debugs(33, 3, "FTP reply data transfer failed: STREAM_FAILED");
-        writeCustomReply(451, "Server error; transfer aborted");
         break;
     default:
         fatal("unreachable code");
     }
-
-    closeDataConnection();
 }
 
 void
 Ftp::Server::handleUploadReply(const HttpReply *reply, StoreIOBuffer)
 {
     writeForwardedReply(reply);
     // note that the client data connection may already be closed by now
 }
 
 void
 Ftp::Server::writeForwardedReply(const HttpReply *reply)
 {
     Must(reply);
 
     const HttpHeader &header = reply->header;
     // adaptation and forwarding errors lack Http::HdrType::FTP_STATUS
     if (!header.has(Http::HdrType::FTP_STATUS)) {
         writeForwardedForeign(reply); // will get to Ftp::Server::wroteReply
         return;
     }
@@ -1466,51 +1470,57 @@
     Ip::Address cltAddr;
     if (!Ftp::ParseIpPort(params.termedBuf(), NULL, cltAddr)) {
         setReply(501, "Invalid parameter");
         return false;
     }
 
     if (!createDataConnection(cltAddr))
         return false;
 
     changeState(fssHandlePort, "handlePortRequest");
     setDataCommand();
     return true; // forward our fake PASV request
 }
 
 bool
 Ftp::Server::handleDataRequest(String &, String &)
 {
     if (!checkDataConnPre())
         return false;
 
+    master->waitForOriginData = true;
+    master->userDataDone = 0;
+
     changeState(fssHandleDataRequest, "handleDataRequest");
 
     return true;
 }
 
 bool
 Ftp::Server::handleUploadRequest(String &, String &)
 {
     if (!checkDataConnPre())
         return false;
 
+    master->waitForOriginData = true;
+    master->userDataDone = 0;
+
     if (Config.accessList.forceRequestBodyContinuation) {
         ClientHttpRequest *http = pipeline.front()->http;
         HttpRequest *request = http->request;
         ACLFilledChecklist bodyContinuationCheck(Config.accessList.forceRequestBodyContinuation, request, NULL);
         if (bodyContinuationCheck.fastCheck() == ACCESS_ALLOWED) {
             request->forcedBodyContinuation = true;
             if (checkDataConnPost()) {
                 // Write control Msg
                 writeEarlyReply(150, "Data connection opened");
                 maybeReadUploadData();
             } else {
                 // wait for acceptDataConnection but tell it to call wroteEarlyReply
                 // after writing "150 Data connection opened"
                 typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
                 AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteEarlyReply);
                 onDataAcceptCall = call;
             }
         }
     }
 
@@ -1695,37 +1705,78 @@
     assert(repContext != NULL);
 
     RequestFlags reqFlags;
     reqFlags.cachable = false; // force releaseRequest() in storeCreateEntry()
     reqFlags.noCache = true;
     repContext->createStoreEntry(http->request->method, reqFlags);
     http->storeEntry()->replaceHttpReply(reply);
 }
 
 void
 Ftp::Server::callException(const std::exception &e)
 {
     debugs(33, 2, "FTP::Server job caught: " << e.what());
     closeDataConnection();
     unpinConnection(true);
     if (Comm::IsConnOpen(clientConnection))
         clientConnection->close();
     AsyncJob::callException(e);
 }
 
+void
+Ftp::Server::originDataCompletionCheckpoint()
+{
+    if (!master->userDataDone) {
+        debugs(33, 5, "Transfering from/to client not finished yet");
+        return;
+    }
+
+    completeDataExchange();
+}
+
+void Ftp::Server::userDataCompletionCheckpoint(int finalStatusCode)
+{
+    Must(!master->userDataDone);
+    master->userDataDone = finalStatusCode;
+
+    if (bodyParser)
+        finishDechunkingRequest(false);
+
+    // The origin control connection is gone, nothing to wait for
+    if (!Comm::IsConnOpen(pinning.serverConnection))
+        master->waitForOriginData = false;
+
+    if (master->waitForOriginData) {
+        // The completeDataExchange() is not called here unconditionally
+        // because we want to signal the FTP user that we are not fully
+        // done processing its data stream, even though all data bytes
+        // have been sent or received already.
+        debugs(33, 5, "Transfering from/to FTP server is not complete");
+        return;
+    }
+
+    completeDataExchange();
+}
+
+void Ftp::Server::completeDataExchange()
+{
+    writeCustomReply(master->userDataDone, master->userDataDone == 226 ? "Transfer complete" : "Server error; transfer aborted");
+    closeDataConnection();
+}
+
 /// Whether Squid FTP Relay supports a named feature (e.g., a command).
 static bool
 Ftp::SupportedCommand(const SBuf &name)
 {
     static std::set<SBuf> BlackList;
     if (BlackList.empty()) {
         /* Add FTP commands that Squid cannot relay correctly. */
 
         // We probably do not support AUTH TLS.* and AUTH SSL,
         // but let's disclaim all AUTH support to KISS, for now.
         BlackList.insert(cmdAuth());
     }
 
     // we claim support for all commands that we do not know about
     return BlackList.find(name) == BlackList.end();
 }
 

=== modified file 'src/servers/FtpServer.h'
--- src/servers/FtpServer.h	2015-11-30 14:23:16 +0000
+++ src/servers/FtpServer.h	2015-12-29 12:50:51 +0000
@@ -24,61 +24,70 @@
     fssHandlePasv,
     fssHandlePort,
     fssHandleDataRequest,
     fssHandleUploadRequest,
     fssHandleEprt,
     fssHandleEpsv,
     fssHandleCwd,
     fssHandlePass,
     fssHandleCdup,
     fssError
 } ServerState;
 
 // TODO: This should become a part of MasterXaction when we start sending
 // master transactions to the clients/ code.
 /// Transaction information shared among our FTP client and server jobs.
 class MasterState: public RefCountable
 {
 public:
     typedef RefCount<MasterState> Pointer;
 
-    MasterState(): serverState(fssBegin), clientReadGreeting(false) {}
+    MasterState(): serverState(fssBegin), clientReadGreeting(false), userDataDone(0), waitForOriginData(false) {}
 
     Ip::Address clientDataAddr; ///< address of our FTP client data connection
     SBuf workingDir; ///< estimated current working directory for URI formation
     ServerState serverState; ///< what our FTP server is doing
     bool clientReadGreeting; ///< whether our FTP client read their FTP server greeting
+    /// Squid will send or has sent this final status code to the FTP client
+    int userDataDone;
+    /// whether the transfer on the Squid-origin data connection is not over yet
+    bool waitForOriginData;
 };
 
 /// Manages a control connection from an FTP client.
 class Server: public ConnStateData
 {
     CBDATA_CLASS(Server);
     // XXX CBDATA_CLASS expands to nonvirtual toCbdata, AsyncJob::toCbdata
     //     is pure virtual. breaks build on clang if override is used
 
 public:
     explicit Server(const MasterXaction::Pointer &xact);
     virtual ~Server();
     /* AsyncJob API */
     virtual void callException(const std::exception &e);
 
+    /// Called by Ftp::Client class when it is done receiving or
+    /// sending data. Waits for both agents to be done before
+    /// responding to the FTP client and closing the data connection.
+    void originDataCompletionCheckpoint();
+
     // This is a pointer in hope to minimize future changes when MasterState
     // becomes a part of MasterXaction. Guaranteed not to be nil.
     MasterState::Pointer master; ///< info shared among our FTP client and server jobs
 
 protected:
     friend void StartListening();
 
     // errors detected before it is possible to create an HTTP request wrapper
     enum class EarlyErrorKind {
         HugeRequest,
         MissingLogin,
         MissingUsername,
         MissingHost,
         UnsupportedCommand,
         InvalidUri,
         MalformedCommand
     };
 
     /* ConnStateData API */
     virtual ClientSocketContext *parseOneRequest();
@@ -93,40 +102,48 @@
     /* BodyPipe API */
     virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer);
     virtual void noteBodyConsumerAborted(BodyPipe::Pointer ptr);
 
     /* AsyncJob API */
     virtual void start();
 
     /* Comm callbacks */
     static void AcceptCtrlConnection(const CommAcceptCbParams &params);
     void acceptDataConnection(const CommAcceptCbParams &params);
     void readUploadData(const CommIoCbParams &io);
     void wroteEarlyReply(const CommIoCbParams &io);
     void wroteReply(const CommIoCbParams &io);
     void wroteReplyData(const CommIoCbParams &io);
     void connectedForData(const CommConnectCbParams &params);
 
     unsigned int listenForDataConnection();
     bool createDataConnection(Ip::Address cltAddr);
     void closeDataConnection();
 
+    /// Called after data trasfer on client-to-squid data connection is
+    /// finished.
+    void userDataCompletionCheckpoint(int finalStatusCode);
+
+    /// Writes the data-transfer status reply to the FTP client and
+    /// closes the data connection.
+    void completeDataExchange();
+
     void calcUri(const SBuf *file);
     void changeState(const Ftp::ServerState newState, const char *reason);
     ClientSocketContext *handleUserRequest(const SBuf &cmd, SBuf &params);
     bool checkDataConnPost() const;
     void replyDataWritingCheckpoint();
     void maybeReadUploadData();
 
     void setReply(const int code, const char *msg);
     void writeCustomReply(const int code, const char *msg, const HttpReply *reply = NULL);
     void writeEarlyReply(const int code, const char *msg);
     void writeErrorReply(const HttpReply *reply, const int status);
     void writeForwardedForeign(const HttpReply *reply);
     void writeForwardedReply(const HttpReply *reply);
     void writeForwardedReplyAndCall(const HttpReply *reply, AsyncCall::Pointer &call);
     void writeReply(MemBuf &mb);
 
     ClientSocketContext *earlyError(const EarlyErrorKind eek);
     bool handleRequest(HttpRequest *);
     void setDataCommand();
     bool checkDataConnPre();

_______________________________________________
squid-dev mailing list
[email protected]
http://lists.squid-cache.org/listinfo/squid-dev

Reply via email to