From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Fri, 30 Nov 2018 18:23:49 -0500
Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender

---
 src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------
 1 file changed, 30 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 46edb52..93f2648 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	sendTimeLineValidUpto = state->currTLIValidUntil;
 	sendTimeLineNextTLI = state->nextTLI;
 
+	/*
+	* If the client sent CopyDone while we were waiting,
+	* bail out so we can wind up the decoding session.
+	*/
+	if (streamingDoneSending)
+		return -1;
+
+	 /* more than one block available */
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
@@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
-		 */
-		if (got_STOPPING)
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
+		*/
+
+		if (got_STOPPING || streamingDoneReceiving)
 			break;
 
 		/*
@@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void)
 	}
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -2142,10 +2161,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -3375,7 +3399,7 @@ WalSndKeepaliveIfNecessary(void)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
2.6.4

