From b3067d6c7ea17688fd9ded25ce5b96cef6efb383 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 12 Nov 2020 14:59:58 +0530
Subject: [PATCH v2 1/2] Bug fix skip-empty-xacts in streaming mode

In streaming mode the transaction can be decoded in multiple streams
and those streams can be interleaved.  Due to that we can not remember
the transaction's write status in the logical decoding context because
those might get changed due to some other transactions so we need to
keep that in the reorder buffer txn.
---
 contrib/test_decoding/test_decoding.c   | 80 ++++++++++++++++++++++++++-------
 src/include/replication/reorderbuffer.h |  5 +++
 2 files changed, 70 insertions(+), 15 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 8e33614..50d8e24 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -34,10 +34,15 @@ typedef struct
 	bool		include_xids;
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
-	bool		xact_wrote_changes;
 	bool		only_local;
 } TestDecodingData;
 
+typedef struct
+{
+	bool		xact_wrote_changes;
+	bool		stream_wrote_changes;
+} TestDecodingTxnData;
+
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 							  bool is_init);
 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -255,8 +260,12 @@ static void
 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata =
+		MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+	txndata->xact_wrote_changes = false;
+	txn->output_plugin_private = txndata;
 
-	data->xact_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
 
@@ -280,8 +289,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	pfree(txndata);
+	txn->output_plugin_private = false;
+
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +456,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 Relation relation, ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	Form_pg_class class_form;
 	TupleDesc	tupdesc;
 	MemoryContext old;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
@@ -527,17 +543,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				   int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	MemoryContext old;
 	int			i;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
@@ -592,10 +610,24 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	data->xact_wrote_changes = false;
+	/*
+	 * If this is the first stream for the txn then allocate the txn plugin
+	 * data and set the xact_wrote_changes to false.
+	 */
+	if (txndata == NULL)
+	{
+		txndata =
+			MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+		txndata->xact_wrote_changes = false;
+		txn->output_plugin_private = txndata;
+	}
+
+	txndata->stream_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
+
 	pg_output_stream_start(ctx, data, txn, true);
 }
 
@@ -615,8 +647,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -633,8 +666,18 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 					   XLogRecPtr abort_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+	TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (txn->toptxn == NULL)
+	{
+		Assert(txn->output_plugin_private != NULL);
+		pfree(txndata);
+		txn->output_plugin_private = false;
+	}
+
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +694,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	pfree(txndata);
+	txn->output_plugin_private = false;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +729,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 						ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
 	/* output stream start if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
@@ -734,12 +783,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						  ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index dfdda93..bd9dd7e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
 
 	/* If we have detected concurrent abort then ignore future changes. */
 	bool		concurrent_abort;
+
+	/*
+	 * Private data pointer of the output plugin.
+	 */
+	void	   *output_plugin_private;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
-- 
1.8.3.1

