Hi,

currently, only the gid is passed on to the filter_prepare callback. While we probably should not pass a full ReorderBufferTXN (as we do for most other output plugin callbacks), a bit more information would be nice, I think.

Attached is a patch that adds the xid (still lacking docs changes). The question about stream_prepare being optional made me think about whether an output plugin needs to know if changes have been already streamed prior to a prepare. Maybe not? Any other information you think the output plugin might find useful to decide whether or not to skip the prepare?

If you are okay with adding just the xid, I'll add docs changes to the patch provided.

Regards

Markus
From: Markus Wanner <mar...@bluegap.ch>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for the
 output plugin.

---
 contrib/test_decoding/test_decoding.c     |  4 +++-
 src/backend/replication/logical/decode.c  | 17 +++++++++++------
 src/backend/replication/logical/logical.c |  5 +++--
 src/include/replication/logical.h         |  3 ++-
 src/include/replication/output_plugin.h   |  1 +
 5 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.1

Reply via email to