On 9 September 2016 at 10:37, Craig Ringer <cr...@2ndquadrant.com> wrote:

> I'm looking at the revised patch now.

Considerably improved.


I fixed a typo from decondig to decoding that's throughout all the
callback names.


This test is wrong:

+        while ((change = ReorderBufferIterTXNNext(rb, iterstate)) !=
NULL && rb->continue_decoding_cb != NULL &&
rb->continue_decoding_cb())

If continue_decoding_cb is non-null, it'll never be called since the
and will short-circuit. If it's null the and will be false so we'll
call rb->continue_decoding_cb().

It also violates PostgreSQL source formatting coding conventions; it
should be wrapped to 80 lines. (Yes, that's archaic, but it's kind of
useful when you've got multiple files open side-by-side, and at least
it's consistent across the codebase).

so it should be:

        while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
               (rb->continue_decoding_cb == NULL ||
                rb->continue_decoding_cb()))

I'd prefer the continue_decoding_cb tests to be on the same line, but
it's slightly too wide. Eh, whatever.


Same logic issue later;

-         if(rb->continue_decoding_cb != NULL && rb->continue_decoding_cb())
+        if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())


Commit message is slightly inaccurate. The walsender DID look for
client messages during ReorderBufferCommit decoding, but it never
reacted to CopyDone sent by a client when it saw it.


Rewrote comment on IsStreamingActive() per my original review advice.



I'm not sure why exactly you removed

-    /* fast path */
-    /* Try to flush pending output to the client */
-    if (pq_flush_if_writable() != 0)
-        WalSndShutdown();
-
-    if (!pq_is_send_pending())
-        return;
-

It should be OK if we flush pending output even after we receive
CopyDone. In fact, we have to, since we can't unqueue it and have to
send it before we can send our own CopyDone reply.
pq_flush_if_writable() should only return EOF if the socket is closed,
in which case fast bailout is the right thing to do.

Can you explain your thinking here and what the intended outcome is?



I've attached updated patches with a number of typo fixes,
copy-editing changes, a fix to the test logic, etc as noted above.

Setting "waiting on author" in CF per discussion of the need for tests.


-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 14bbd25cf27c3555126b571ee7cb41524f2a8729 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 10 May 2016 10:34:10 +0800
Subject: [PATCH 1/2] Respect client-initiated CopyDone in walsender

The walsender never reacted to CopyDone sent by the client unless it
had already decided it was done sending data and dispatched its own
CopyDone message.

It actually noticed CopyDone from the client when WalSndWriteData() called
ProcessRepliesIfAny() but it didn't react to it until it separately decided to
end streaming from the walsender end.

Modify the walsender so it checks for client-initiated CopyDone when in COPY
BOTH mode. It now cleans up what it's doing, reples with its own CopyDone and
returns to command mode. In logical decoding this will allow the client to end
a logical decoding session between transactions without just unilaterally
closing its connection. For physical walsender connections this allows the
client to end streaming before the end of a timeline.

This change does not allow a client to end COPY BOTH session in the middle of
processing a logical decoding commit (in ReorderBufferCommit) or while decoding
a large WAL record, so there can still be a significant delay before the
walsender reacts to the client.
---
 src/backend/replication/walsender.c | 23 +++++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ea2a5c..ad94c13 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -759,6 +759,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
+	/*
+	 * If the client sent CopyDone while we were waiting,
+	 * bail out so we can wind up the decoding session.
+	 */
+	if (streamingDoneSending)
+		return -1;
+
 	/* more than one block available */
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;
@@ -1220,8 +1227,11 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
 		 */
-		if (walsender_ready_to_stop)
+		if (walsender_ready_to_stop || streamingDoneReceiving)
 			break;
 
 		/*
@@ -1850,10 +1860,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -2909,7 +2924,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
2.5.5

From d8a3de33a634cc2388ac91132bbf2704bba73c11 Mon Sep 17 00:00:00 2001
From: Vladimir Gordiychuk <fol...@gmail.com>
Date: Wed, 7 Sep 2016 00:39:18 +0300
Subject: [PATCH 2/2] Respect client-initiated CopyDone during transaction
 decoding in walsender

The prior patch caused the walsender to react to a client-initiated
CopyDone while it's in the WalSndLoop. That's not enough to let clients
end logical decoding mid-transaction because we loop in ReorderBufferCommit
during decoding of a transaction without ever returning to WalSndLoop.

Allow breaking out of ReorderBufferCommit by detecting that client
input has requested an end to COPY BOTH mode, so clients can abort
decoding mid-transaction.
---
 src/backend/replication/logical/logical.c       | 15 ++++++----
 src/backend/replication/logical/logicalfuncs.c  |  3 +-
 src/backend/replication/logical/reorderbuffer.c | 16 ++++++++--
 src/backend/replication/slotfuncs.c             |  2 +-
 src/backend/replication/walsender.c             | 39 ++++++++++++++++++-------
 src/include/replication/logical.h               |  6 ++--
 src/include/replication/reorderbuffer.h         | 14 +++++++++
 7 files changed, 72 insertions(+), 23 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1512be5..4065899 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   ContinueDecodingCB continue_decoding_cb)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -180,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->continue_decoding_cb = continue_decoding_cb;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -212,7 +214,8 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  ContinueDecodingCB continue_decoding_cb)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -288,7 +291,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotSave();
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write, continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -328,7 +331,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  ContinueDecodingCB continue_decoding_cb)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -378,7 +382,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 9c7be2d..767d6ce 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -249,7 +249,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite,
+									NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9b430b9..dc3c88f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1341,6 +1341,11 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
  * the top and subtransactions (using a k-way merge) and replay the changes in
  * lsn order.
+ *
+ * ReorderBufferCommit processes all changes in a transaction and feeds them to
+ * output plugin callbacks. It does not return until the full transaction has
+ * been consumed or continue_decoding_cb() returns false to indicate an early
+ * abort request.
  */
 void
 ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
@@ -1417,7 +1422,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
+			   (rb->continue_decoding_cb == NULL ||
+				rb->continue_decoding_cb()))
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1643,8 +1650,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())
+		{
+			/* call commit callback unless aborting early */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f908761..976bc0c 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(
 									NameStr(*plugin), NIL,
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ad94c13..91c5b41 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -218,6 +218,23 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+/*
+ * Return true until either the client or server side have requested that we wind
+ * up COPY BOTH mode by sending a CopyDone.
+ *
+ * If we receive a CopyDone from the client we should avoid sending any further
+ * CopyData messages and return to command mode as promptly as possible.
+ *
+ * While in the middle of sending data to a client we notice a client-initated
+ * CopyDone when WalSndWriteData() calls ProcessRepliesIfAny() and it
+ * sets streamingDoneSending.
+ */
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -823,7 +840,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1002,7 +1020,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_decoding_ctx = CreateDecodingContext(
 											   cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData, IsStreamingActive);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1093,14 +1111,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	if (!pq_is_send_pending())
-		return;
-
 	for (;;)
 	{
 		int			wakeEvents;
@@ -1235,7 +1245,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 947000e..9fc4098 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -81,13 +81,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  ContinueDecodingCB continue_decoding_cb);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  ContinueDecodingCB continue_decoding_cb);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9e209ae..f5d9f5d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -292,6 +292,12 @@ typedef void (*ReorderBufferMessageCB) (
 												 const char *prefix, Size sz,
 													const char *message);
 
+/*
+ * Callback function that allows streaming of a change to be interrupted
+ * part-way through, between output plugin calls.
+ */
+typedef bool (*ContinueDecodingCB) (void);
+
 struct ReorderBuffer
 {
 	/*
@@ -321,6 +327,14 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to control whether reorder buffer processing should end before
+	 * normal completion. Normally true; if it returns false, reorder buffer
+	 * processing (e.g. in ReorderBufferCommit) will bail out as soon as
+	 * possible.
+	 */
+	ContinueDecodingCB continue_decoding_cb;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to