On 25 August 2016 at 13:04, Craig Ringer <cr...@2ndquadrant.com> wrote:

> By the way, I now think that the second part of your patch, to allow
> interruption during ReorderBufferCommit processing, is also very
> desirable.

I've updated your patch, rebasing it on top of 10.0 master and
splitting it into two pieces. I thought you were planning on following
up with an update to the second part, but maybe that was unclear from
our prior discussion, so I'm doing this to help get this moving again.

The first patch is what I posted earlier - the part of your patch that
allows the walsender to exit COPY BOTH mode and return to command mode
in response to a client-initiated CopyDone when it's in the
WalSndLoop.  For logical decoding this means that it will notice a
client CopyDone when it's between transactions or while it's ingesting
transaction changes. It won't react to CopyDone while it's in
ReorderBufferCommit and sending a transaction to an output plugin
though.

The second piece is the other half of your original patch. Currently
unmodified from what you wrote. It adds a hook in ReorderBufferCommit
that calls back into the walsender to check for new client input and
interrupt processing. I haven't yet investigated this in detail.


Review comments on the 2nd patch, i.e. the 2nd half of your original patch:

* Other places in logical decoding use the CB suffix for callback
types. This should do the same.

* I'm not too keen on the name is_active  for the callback. We
discussed the name continue_decoding_cb in our prior conversation.

* Instead of having LogicalContextAlwaysActive, would it be better to
test if the callback pointer is null and skip it if so?

* Lots of typos in LogicalContextAlwaysActive comments, and wording is unclear

* comment added to arguments of pg_logical_slot_get_changes_guts is
not in PostgreSQL style - it goes way wide on the line. Probably
better to put the comment above the call and mention which argument it
refers to.

* A comment somewhere is needed - probably on the IsStreamingActive()
callback - to point readers at the fact that WalSndWriteData() calls
ProcessRepliesIfAny() and that's how IsStreamingActive can get set. I
had to look at the email discussion history to remind myself how this
worked when I was re-reading the code.

* I'm not keen on

    typedef ReorderBufferIsActive LogicalDecondingContextIsActive;

  I think instead a single callback name that encompasses both should
be used. ContinueDecodingCB ?

* There are no tests. I don't see any really simple way to test this, though.

I suggest that you apply these updated/split patches to a fresh copy
of master then see if you can update it based on this feedback.
Something like:

git checkout master
git pull
git checkout -b stop-decoding-v10
git am /path/to/0001-Respect-client-initiated-CopyDone-in-walsender.patch
git am /path/to/0002-Second-part-of-Vladimir-Gordiychuk-s-patch.patch

then modify the code per the above, and "git commit --amend" the
changes and send an updated set of patches with "git format-patch -2"
.

I am setting this patch as "waiting on author" in the commitfest.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 81b3f61b0235e1b936e3336e43ce55c00bc230c4 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 10 May 2016 10:34:10 +0800
Subject: [PATCH 1/2] Respect client-initiated CopyDone in walsender

The walsender never looked for CopyDone sent by the client unless it
had already decided it was done sending data and dispatched its own
CopyDone message.

Check for client-initiated CopyDone when in COPY BOTH mode, returning to
command mode. In logical decoding this will allow the client to end a logical
decoding session between transactions without just unilaterally closing its
connection.

This change does not allow a client to end COPY BOTH session in the middle of
processing a logical decoding block.

TODO effect on physical walsender?
---
 src/backend/replication/walsender.c | 32 +++++++++++++++++++++++++++-----
 1 file changed, 27 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ea2a5c..c43310c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -759,6 +759,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
+	/*
+	 * If the client sent CopyDone while we were waiting,
+	 * bail out so we can wind up the decoding session.
+	 */
+	if (streamingDoneSending)
+		return -1;
+
 	/* more than one block available */
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;
@@ -1220,8 +1227,11 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
 		 */
-		if (walsender_ready_to_stop)
+		if (walsender_ready_to_stop || streamingDoneReceiving)
 			break;
 
 		/*
@@ -1787,7 +1797,14 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -1850,10 +1867,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -2909,7 +2931,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
2.5.5

From 43de23ddafb17950add6ffff585e501e5ce4c54b Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 5 Sep 2016 22:19:03 +0800
Subject: [PATCH 2/2] Second part of Vladimir Gordiychuk's patch

This component allows streaming to be interrupted during ReorderBufferCommit.

TODO: explain, write up, document
---
 src/backend/replication/logical/logical.c       | 15 ++++++----
 src/backend/replication/logical/logicalfuncs.c  | 15 +++++++++-
 src/backend/replication/logical/reorderbuffer.c |  9 ++++--
 src/backend/replication/slotfuncs.c             |  2 +-
 src/backend/replication/walsender.c             | 39 +++++++++++++------------
 src/include/replication/logical.h               | 13 +++++++--
 src/include/replication/reorderbuffer.h         |  8 +++++
 7 files changed, 70 insertions(+), 31 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1512be5..818b7cf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   ReorderBufferIsActive is_active)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -180,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->is_active = is_active;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -212,7 +214,8 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalDecondingContextIsActive is_active)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -288,7 +291,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotSave();
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write, is_active);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -328,7 +331,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalDecondingContextIsActive is_active)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -378,7 +382,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 is_active);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 4e4c8cd..e0efd4c 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -104,6 +104,17 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	p->returned_rows++;
 }
 
+/**
+ * Stab function that necessary for LogicalDecondign context.
+ * Function always return true and it means that decoding WALs
+ * can't be interrupt in contrast of logical replication.
+ */
+static bool
+LogicalContextAlwaysActive(void)
+{
+	return true;
+}
+
 static void
 check_permissions(void)
 {
@@ -249,7 +260,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite,
+									LogicalContextAlwaysActive /* converting to Datum(sql api) can't be interrupted in contrast of replication*/
+									);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9b430b9..26b8e71 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1417,7 +1417,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && rb->is_active())
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1643,8 +1643,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if(rb->is_active())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f908761..976bc0c 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(
 									NameStr(*plugin), NIL,
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c43310c..9f4c7b5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -218,6 +218,8 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+static bool IsStreamingActive(void);
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -823,7 +825,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1002,7 +1005,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_decoding_ctx = CreateDecodingContext(
 											   cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData, IsStreamingActive);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1093,14 +1096,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	if (!pq_is_send_pending())
-		return;
-
 	for (;;)
 	{
 		int			wakeEvents;
@@ -1235,7 +1230,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
@@ -1797,14 +1799,7 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/*
- * Main loop of walsender process that streams the WAL over Copy messages.
- *
- * The send_data callback must enqueue complete CopyData messages to libpq
- * using pq_putmessage_noblock or similar, since the walsender loop may send
- * CopyDone then exit and return to command mode in response to a client
- * CopyDone between calls to send_data.
- */
+/* Main loop of walsender process that streams the WAL over Copy messages. */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -2951,3 +2946,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 			WalSndShutdown();
 	}
 }
+
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 947000e..ba54ddc 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,13 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+/*
+ * Callback function that allow interrupt logical replication during decoding.
+ * Function return true if decoding can be continue decode, but if function return false
+ * logical decoding will stop as soon as possible.
+ */
+typedef ReorderBufferIsActive LogicalDecondingContextIsActive;
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -81,13 +88,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalDecondingContextIsActive is_active);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalDecondingContextIsActive is_active);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9e209ae..b0916d7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -292,6 +292,9 @@ typedef void (*ReorderBufferMessageCB) (
 												 const char *prefix, Size sz,
 													const char *message);
 
+/* callback signature for check decoding status */
+typedef bool (*ReorderBufferIsActive) (void);
+
 struct ReorderBuffer
 {
 	/*
@@ -321,6 +324,11 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to define status of decoding. Return false if decoding not necessary continue
+	 */
+	ReorderBufferIsActive is_active;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to