On Fri, Nov 13, 2020 at 3:18 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Thu, Nov 12, 2020 at 3:10 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
> >
> > On Tue, Nov 10, 2020 at 7:20 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > > On Tue, Nov 10, 2020 at 2:25 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
> >
> > > 3. Can you please prepare a separate patch for test case changes so
> > > that it would be easier to verify that it fails without the patch and
> > > passed after the patch?
> >
> > Done
> >
>
> Few comments:
> =================
> 1.
>    -- consume DDL
>    SELECT data FROM pg_logical_slot_get_changes('isolation_slot',
> NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
> -  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
> 'select array_agg(md5(g::text))::text from generate_series(1, 80000)
> g';
> +  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
> 'select array_agg(md5(g::text))::text from generate_series(1, 60000)
> g';
>  }
>
>
> Is there a reason for this change? I think probably here a lesser
> number of rows are sufficient to serve the purpose of the test but I
> am not sure if it is related to this patch or there is any other
> reason behind this change?

I think I changed for some experiment and got included in the patch so
reverted this.

> 2.
> +typedef struct
> +{
> + bool xact_wrote_changes;
> + bool stream_wrote_changes;
> +} TestDecodingTxnData;
> +
>
> I think here a comment explaining why we need this as a separate
> structure would be better and probably explain why we need two
> different members.

Done

> 3.
> pg_decode_commit_txn()
> {
> ..
> - if (data->skip_empty_xacts && !data->xact_wrote_changes)
> + pfree(txndata);
> + txn->output_plugin_private = false;
> +
>
> Here, don't we need to set txn->output_plugin_private as NULL as it is
> a pointer and we do explicitly test it for being NULL at other places?
> Also, change at other places where it is set as false.

Fixed

> 4.
> @@ -592,10 +610,24 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
>      ReorderBufferTXN *txn)
>  {
>   TestDecodingData *data = ctx->output_plugin_private;
> + TestDecodingTxnData *txndata = txn->output_plugin_private;
>
> - data->xact_wrote_changes = false;
> + /*
> + * If this is the first stream for the txn then allocate the txn plugin
> + * data and set the xact_wrote_changes to false.
> + */
> + if (txndata == NULL)
> + {
> + txndata =
>
> As we are explicitly testing for NULL here, isn't it better to
> explicitly initialize 'output_plugin_private' with NULL in
> ReorderBufferGetTXN?

Done

> 5.
> @@ -633,8 +666,18 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
>      XLogRecPtr abort_lsn)
>  {
>   TestDecodingData *data = ctx->output_plugin_private;
> + ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
> + TestDecodingTxnData *txndata = toptxn->output_plugin_private;
> + bool xact_wrote_changes = txndata->xact_wrote_changes;
>
> - if (data->skip_empty_xacts && !data->xact_wrote_changes)
> + if (txn->toptxn == NULL)
> + {
> + Assert(txn->output_plugin_private != NULL);
> + pfree(txndata);
> + txn->output_plugin_private = false;
> + }
> +
>
> Here, if we are expecting 'output_plugin_private' to be set only for
> toptxn then the Assert and reset should happen for toptxn? I find the
> changes in this function a bit unclear so probably adding a comment
> here could help.

I have added the comments.


-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From 037ad84600e308c03fb71d21c2550deccb0bc2aa Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 12 Nov 2020 15:00:25 +0530
Subject: [PATCH v3 2/2] Test case to test the interleaved empty transactions

---
 contrib/test_decoding/expected/concurrent_stream.out | 5 +++--
 contrib/test_decoding/specs/concurrent_stream.spec   | 6 +++++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out
index e731d13..6f8b217 100644
--- a/contrib/test_decoding/expected/concurrent_stream.out
+++ b/contrib/test_decoding/expected/concurrent_stream.out
@@ -1,11 +1,12 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
-starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s2_ddl s1_commit s1_get_stream_changes
 step s0_begin: BEGIN;
 step s0_ddl: CREATE TABLE stream_test1(data text);
 step s1_ddl: CREATE TABLE stream_test(data text);
 step s1_begin: BEGIN;
 step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s2_ddl: CREATE TABLE stream_test2(data text);
 step s1_commit: COMMIT;
 step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 data           
diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec
index ad9fde9..f82e4d6 100644
--- a/contrib/test_decoding/specs/concurrent_stream.spec
+++ b/contrib/test_decoding/specs/concurrent_stream.spec
@@ -23,6 +23,10 @@ setup { SET synchronous_commit=on; }
 step "s0_begin" { BEGIN; }
 step "s0_ddl"   {CREATE TABLE stream_test1(data text);}
 
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_ddl"   {CREATE TABLE stream_test2(data text);}
+
 # The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
 # the currently running s0_ddl and we want to test that s0_ddl should not get
 # streamed when user asked to skip-empty-xacts.
@@ -34,4 +38,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
 step "s1_commit" { COMMIT; }
 step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
 
-permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes"
-- 
1.8.3.1

From 6f6f3638a6c6a41370c43e77b47f5390f13c453b Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 12 Nov 2020 14:59:58 +0530
Subject: [PATCH v3 1/2] Bug fix skip-empty-xacts in streaming mode

In streaming mode the transaction can be decoded in multiple streams
and those streams can be interleaved.  Due to that we can not remember
the transaction's write status in the logical decoding context because
those might get changed due to some other transactions so we need to
keep that in the reorder buffer txn.
---
 contrib/test_decoding/test_decoding.c           | 95 +++++++++++++++++++++----
 src/backend/replication/logical/reorderbuffer.c |  1 +
 src/include/replication/reorderbuffer.h         |  5 ++
 3 files changed, 86 insertions(+), 15 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 8e33614..62b3855 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -34,10 +34,24 @@ typedef struct
 	bool		include_xids;
 	bool		include_timestamp;
 	bool		skip_empty_xacts;
-	bool		xact_wrote_changes;
 	bool		only_local;
 } TestDecodingData;
 
+/*
+ * Maintain a per transaction level variable to track whether the transaction
+ * has wrote any changes or not to identify whether it is an empty transaction
+ * or not.  In streaming mode the transaction can be decoded in streams so
+ * along with maintaining whether the transaction has written any changes or
+ * not we also need to track whether the current stream has written any changes
+ * or not so that if user has requested to skip the empty transactions we can
+ * skip the empty streams even though the transaction has written some changes.
+ */
+typedef struct
+{
+	bool		xact_wrote_changes;
+	bool		stream_wrote_changes;
+} TestDecodingTxnData;
+
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 							  bool is_init);
 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -255,8 +269,12 @@ static void
 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata =
+		MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+	txndata->xact_wrote_changes = false;
+	txn->output_plugin_private = txndata;
 
-	data->xact_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
 
@@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 Relation relation, ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	Form_pg_class class_form;
 	TupleDesc	tupdesc;
 	MemoryContext old;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
@@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				   int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	TestDecodingData *data;
+	TestDecodingTxnData *txndata;
 	MemoryContext old;
 	int			i;
 
 	data = ctx->output_plugin_private;
+	txndata = txn->output_plugin_private;
 
 	/* output BEGIN if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
 	{
 		pg_output_begin(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = true;
 
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
@@ -592,10 +619,24 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	data->xact_wrote_changes = false;
+	/*
+	 * If this is the first stream for the txn then allocate the txn plugin
+	 * data and set the xact_wrote_changes to false.
+	 */
+	if (txndata == NULL)
+	{
+		txndata =
+			MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+		txndata->xact_wrote_changes = false;
+		txn->output_plugin_private = txndata;
+	}
+
+	txndata->stream_wrote_changes = false;
 	if (data->skip_empty_xacts)
 		return;
+
 	pg_output_stream_start(ctx, data, txn, true);
 }
 
@@ -615,8 +656,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -634,7 +676,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	/*
+	 * stream abort can be sent for an individial subtransaction but we
+	 * maintain the output_plugin_private only under the toptxn so if this
+	 * is not the toptxn then fetch the toptxn.
+	 */
+	ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+	TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	if (txn->toptxn == NULL)
+	{
+		Assert(txn->output_plugin_private != NULL);
+		pfree(txndata);
+		txn->output_plugin_private = NULL;
+	}
+
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +709,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						XLogRecPtr commit_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
+	bool	xact_wrote_changes = txndata->xact_wrote_changes;
+
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !xact_wrote_changes)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +744,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 						ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
 	/* output stream start if we haven't yet */
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
@@ -734,12 +798,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						  ReorderBufferChange *change)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata = txn->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
 	{
 		pg_output_stream_start(ctx, data, txn, false);
 	}
-	data->xact_wrote_changes = true;
+	txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
 
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c1bd680..301baff 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -402,6 +402,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 
 	/* InvalidCommandId is not zero, so set it explicitly */
 	txn->command_id = InvalidCommandId;
+	txn->output_plugin_private = NULL;
 
 	return txn;
 }
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index dfdda93..bd9dd7e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
 
 	/* If we have detected concurrent abort then ignore future changes. */
 	bool		concurrent_abort;
+
+	/*
+	 * Private data pointer of the output plugin.
+	 */
+	void	   *output_plugin_private;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
-- 
1.8.3.1

Reply via email to