On 29.03.21 08:23, Amit Kapila wrote:
On Mon, Mar 29, 2021 at 11:42 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
Why do you think that this callback can be invoked several times per
transaction? I think it could be called at most two times, once at
prepare time, then at commit or rollback time. So, I think using
'multiple' instead of 'several' times is better.

Thank you for reviewing.

That's fine with me, I just wanted to provide an explanation for why the callback needs to be stable. (I would not want to limit us in the docs to guarantee it is called only twice. 'multiple' sounds generic enough, I changed it to that word.)

What exactly is the node identifier here? Is it a publisher or
subscriber node id? We might want to be a bit more explicit here?

Good point. I clarified this to speak of the origin node (given this is not necessarily the direct provider when using chained replication).

An updated patch is attached.

Regards

Markus
From: Markus Wanner <mar...@bluegap.ch>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output
 plugins

---
 contrib/test_decoding/test_decoding.c     |  4 ++-
 doc/src/sgml/logicaldecoding.sgml         | 34 +++++++++++++++--------
 src/backend/replication/logical/decode.c  | 17 ++++++++----
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h         |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..f3ac84aa85a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,30 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
        <command>COMMIT PREPARED</command> time. To signal that
        decoding should be skipped, return <literal>true</literal>;
        <literal>false</literal> otherwise. When the callback is not
-       defined, <literal>false</literal> is assumed (i.e. nothing is
-       filtered).
+       defined, <literal>false</literal> is assumed (i.e. no filtering, all
+       transactions using two-phase commit are decoded in two phases as well).
 <programlisting>
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              TransactionId xid,
                                               const char *gid);
 </programlisting>
-      The <parameter>ctx</parameter> parameter has the same contents as for the
-      other callbacks. The <parameter>gid</parameter> is the identifier that later
-      identifies this transaction for <command>COMMIT PREPARED</command> or
-      <command>ROLLBACK PREPARED</command>.
+       The <parameter>ctx</parameter> parameter has the same contents as for
+       the other callbacks. The parameters <parameter>xid</parameter>
+       and <parameter>gid</parameter> provide two different ways to identify
+       the transaction.  For some systems, the <parameter>gid</parameter> may
+       be sufficient.  However, reuse of the same <parameter>gid</parameter>
+       for example by a downstream node using multiple subscriptions may lead
+       to it not being a unique identifier.  Therefore, other systems combine
+       the <parameter>xid</parameter> with a node identifier to form a
+       globally unique transaction identifier.  The later <command>COMMIT
+       PREPARED</command> or <command>ROLLBACK PREPARED</command> carries both
+       identifiers, providing an output plugin the choice of what to use.
      </para>
      <para>
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback may be invoked several times per transaction to decode and
+       must provide the same static answer for a given pair of
+       <parameter>xid</parameter> and <parameter>gid</parameter> every time
+       it is called.
      </para>
      </sect3>
 
@@ -1219,9 +1229,11 @@ stream_commit_cb(...);  &lt;-- commit of the streamed transaction
    </para>
 
    <para>
-    Optionally the output plugin can specify a name pattern in the
-    <function>filter_prepare_cb</function> and transactions with gid containing
-    that name pattern will not be decoded as a two-phase commit transaction.
+    Optionally the output plugin can define filtering rules via
+    <function>filter_prepare_cb</function> to decode only specific transaction
+    in two phases.  This can be achieved by pattern matching on the
+    <parameter>gid</parameter> or via lookups using the
+    <parameter>xid</parameter>.
    </para>
 
    <para>
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.2

Reply via email to