>Review comments on the 2nd patch, i.e. the 2nd half of your original patch:
>
>* Other places in logical decoding use the CB suffix for callback
>types. This should do the same.
>
>* I'm not too keen on the name is_active  for the callback. We
>discussed the name continue_decoding_cb in our prior conversation.
>
>* Instead of having LogicalContextAlwaysActive, would it be better to
>test if the callback pointer is null and skip it if so?
>
>* Lots of typos in LogicalContextAlwaysActive comments, and wording is
unclear
>
>* comment added to arguments of pg_logical_slot_get_changes_guts is
>not in PostgreSQL style - it goes way wide on the line. Probably
>better to put the comment above the call and mention which argument it
>refers to.
>
>* A comment somewhere is needed - probably on the IsStreamingActive()
>callback - to point readers at the fact that WalSndWriteData() calls
>ProcessRepliesIfAny() and that's how IsStreamingActive can get set. I
>had to look at the email discussion history to remind myself how this
>worked when I was re-reading the code.
>
>* I'm not keen on
>
>    typedef ReorderBufferIsActive LogicalDecondingContextIsActive;
>
>  I think instead a single callback name that encompasses both should
>be used. ContinueDecodingCB ?


Fixed patch in attach.

> * There are no tests. I don't see any really simple way to test this,
though.

I will be grateful if you specify the best way how to do it. I tests this
patches by pgjdbc driver tests that also build on head of postgres. You can
check test scenario for both patches in PRhttps://github.com/pgjdbc/
pgjdbc/pull/550

org.postgresql.replication.LogicalReplicationTest#testDuringSendBigTransactionReplicationStreamCloseNotActive
org.postgresql.replication.LogicalReplicationTest#testAfterCloseReplicationStreamDBSlotStatusNotActive


2016-09-06 4:10 GMT+03:00 Craig Ringer <cr...@2ndquadrant.com>:

> On 25 August 2016 at 13:04, Craig Ringer <cr...@2ndquadrant.com> wrote:
>
> > By the way, I now think that the second part of your patch, to allow
> > interruption during ReorderBufferCommit processing, is also very
> > desirable.
>
> I've updated your patch, rebasing it on top of 10.0 master and
> splitting it into two pieces. I thought you were planning on following
> up with an update to the second part, but maybe that was unclear from
> our prior discussion, so I'm doing this to help get this moving again.
>
> The first patch is what I posted earlier - the part of your patch that
> allows the walsender to exit COPY BOTH mode and return to command mode
> in response to a client-initiated CopyDone when it's in the
> WalSndLoop.  For logical decoding this means that it will notice a
> client CopyDone when it's between transactions or while it's ingesting
> transaction changes. It won't react to CopyDone while it's in
> ReorderBufferCommit and sending a transaction to an output plugin
> though.
>
> The second piece is the other half of your original patch. Currently
> unmodified from what you wrote. It adds a hook in ReorderBufferCommit
> that calls back into the walsender to check for new client input and
> interrupt processing. I haven't yet investigated this in detail.
>
>
> Review comments on the 2nd patch, i.e. the 2nd half of your original patch:
>
> * Other places in logical decoding use the CB suffix for callback
> types. This should do the same.
>
> * I'm not too keen on the name is_active  for the callback. We
> discussed the name continue_decoding_cb in our prior conversation.
>
> * Instead of having LogicalContextAlwaysActive, would it be better to
> test if the callback pointer is null and skip it if so?
>
> * Lots of typos in LogicalContextAlwaysActive comments, and wording is
> unclear
>
> * comment added to arguments of pg_logical_slot_get_changes_guts is
> not in PostgreSQL style - it goes way wide on the line. Probably
> better to put the comment above the call and mention which argument it
> refers to.
>
> * A comment somewhere is needed - probably on the IsStreamingActive()
> callback - to point readers at the fact that WalSndWriteData() calls
> ProcessRepliesIfAny() and that's how IsStreamingActive can get set. I
> had to look at the email discussion history to remind myself how this
> worked when I was re-reading the code.
>
> * I'm not keen on
>
>     typedef ReorderBufferIsActive LogicalDecondingContextIsActive;
>
>   I think instead a single callback name that encompasses both should
> be used. ContinueDecodingCB ?
>
> * There are no tests. I don't see any really simple way to test this,
> though.
>
> I suggest that you apply these updated/split patches to a fresh copy
> of master then see if you can update it based on this feedback.
> Something like:
>
> git checkout master
> git pull
> git checkout -b stop-decoding-v10
> git am /path/to/0001-Respect-client-initiated-CopyDone-in-walsender.patch
> git am /path/to/0002-Second-part-of-Vladimir-Gordiychuk-s-patch.patch
>
> then modify the code per the above, and "git commit --amend" the
> changes and send an updated set of patches with "git format-patch -2"
> .
>
> I am setting this patch as "waiting on author" in the commitfest.
>
> --
>  Craig Ringer                   http://www.2ndQuadrant.com/
>  PostgreSQL Development, 24x7 Support, Training & Services
>
From fcdcfe1aedfb3c7ef90c78c7d8acb4ca99a2ffdc Mon Sep 17 00:00:00 2001
From: Vladimir Gordiychuk <fol...@gmail.com>
Date: Wed, 7 Sep 2016 00:39:18 +0300
Subject: [PATCH 2/2] Client-initiated CopyDone during transaction decondig in
 walsender

The walsender never looked for client messages during decode transaction
by ReorderBufferCommit. It affect long transaction, even if client try
interrupt streaming by CopyDone package, server will still send CopyData messages
and generate network traffic. Client also can request from server reply by
KeepAllive package(by send KeepAllive with requireReply flag).

This patch introduce check client messages every send CopyData message and interrupt
transaction decoding as fast as possible if was receive CopyDone from client(ContinueDecodingCB).
---
 src/backend/replication/logical/logical.c       | 15 +++++---
 src/backend/replication/logical/logicalfuncs.c  |  3 +-
 src/backend/replication/logical/reorderbuffer.c |  9 +++--
 src/backend/replication/slotfuncs.c             |  2 +-
 src/backend/replication/walsender.c             | 50 +++++++++++++++----------
 src/include/replication/logical.h               |  6 ++-
 src/include/replication/reorderbuffer.h         | 12 ++++++
 7 files changed, 66 insertions(+), 31 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1512be5..fcb1ffc 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   ContinueDecodingCB continue_decondig_cb)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -180,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->continue_decoding_cb = continue_decondig_cb;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -212,7 +214,8 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  ContinueDecodingCB continue_decondig_cb)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -288,7 +291,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotSave();
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write, continue_decondig_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -328,7 +331,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  ContinueDecodingCB continue_decondig_cb)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -378,7 +382,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 continue_decondig_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 9c7be2d..767d6ce 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -249,7 +249,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite,
+									NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9b430b9..62f42e8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1417,7 +1417,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && rb->continue_decoding_cb != NULL && rb->continue_decoding_cb())
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1643,8 +1643,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if(rb->continue_decoding_cb != NULL && rb->continue_decoding_cb())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f908761..976bc0c 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(
 									NameStr(*plugin), NIL,
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c43310c..d85ad16 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -218,6 +218,19 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+/*
+ * Return true if client or server initialize CopyDone process.
+ */
+/*
+ * While streaming WAL in Copy mode, client or server can initialize end streaming
+ * process by sending CopyDone. We should not send any more CopyData messages
+ * after that.
+ *
+ * This function return false if was send CopyDone(disconnect by timeout)
+ * or receive from client.
+ */
+static bool IsStreamingActive(void);
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -823,7 +836,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1002,7 +1016,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_decoding_ctx = CreateDecodingContext(
 											   cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData, IsStreamingActive);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1093,14 +1107,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	if (!pq_is_send_pending())
-		return;
-
 	for (;;)
 	{
 		int			wakeEvents;
@@ -1235,7 +1241,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
@@ -1797,14 +1810,7 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/*
- * Main loop of walsender process that streams the WAL over Copy messages.
- *
- * The send_data callback must enqueue complete CopyData messages to libpq
- * using pq_putmessage_noblock or similar, since the walsender loop may send
- * CopyDone then exit and return to command mode in response to a client
- * CopyDone between calls to send_data.
- */
+/* Main loop of walsender process that streams the WAL over Copy messages. */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -2951,3 +2957,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 			WalSndShutdown();
 	}
 }
+
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 947000e..0ade384 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -81,13 +81,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  ContinueDecodingCB continue_decondig_cb);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  ContinueDecodingCB continue_decondig_cb);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9e209ae..c2e0eea 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -292,6 +292,13 @@ typedef void (*ReorderBufferMessageCB) (
 												 const char *prefix, Size sz,
 													const char *message);
 
+/*
+ * Callback function that allow interrupt logical replication during decoding.
+ * Function return true if decoding can be continue decode, but if function return false
+ * logical decoding will stop as soon as possible.
+ */
+typedef bool (*ContinueDecodingCB) (void);
+
 struct ReorderBuffer
 {
 	/*
@@ -321,6 +328,11 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to define status of decoding. Return false if decoding not necessary continue
+	 */
+	ContinueDecodingCB continue_decoding_cb;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
1.9.1

From 97da2c500f8f8edc69bcd520096c686e99e52612 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 10 May 2016 10:34:10 +0800
Subject: [PATCH 1/2] Respect client-initiated CopyDone in walsender

The walsender never looked for CopyDone sent by the client unless it
had already decided it was done sending data and dispatched its own
CopyDone message.

Check for client-initiated CopyDone when in COPY BOTH mode, returning to
command mode. In logical decoding this will allow the client to end a logical
decoding session between transactions without just unilaterally closing its
connection.

This change does not allow a client to end COPY BOTH session in the middle of
processing a logical decoding block.

TODO effect on physical walsender?
---
 src/backend/replication/walsender.c | 32 +++++++++++++++++++++++++++-----
 1 file changed, 27 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ea2a5c..c43310c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -759,6 +759,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
+	/*
+	 * If the client sent CopyDone while we were waiting,
+	 * bail out so we can wind up the decoding session.
+	 */
+	if (streamingDoneSending)
+		return -1;
+
 	/* more than one block available */
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;
@@ -1220,8 +1227,11 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
 		 */
-		if (walsender_ready_to_stop)
+		if (walsender_ready_to_stop || streamingDoneReceiving)
 			break;
 
 		/*
@@ -1787,7 +1797,14 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -1850,10 +1867,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -2909,7 +2931,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
1.9.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to