On Mon, 26 Jun 2023 at 15:51, Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, Jun 26, 2023 at 3:07 PM Ashutosh Bapat
> <ashutosh.bapat....@gmail.com> wrote:
> >
> > Hi All,
> > Every pg_decode routine except pg_decode_message  that decodes a
> > transactional change, has following block
> > /* output BEGIN if we haven't yet */
> > if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
> > {
> > pg_output_begin(ctx, data, txn, false);
> > }
> > txndata->xact_wrote_changes = true;
> >
> > But pg_decode_message() doesn't call pg_output_begin(). If a WAL
> > message is the first change in the transaction, it won't have a BEGIN
> > before it. That looks like a bug. Why is pg_decode_message()
> > exception?
> >
>
> I can't see a reason why we shouldn't have a similar check for
> transactional messages. So, agreed this is a bug.

Here is a patch having the fix for the same. I have not added any
tests as the existing tests cover this scenario. The same issue is
present in back branches too.
v1-0001-Call-pg_output_begin-in-pg_decode_message-if-it-i_master.patch
can be applied on master, PG15 and PG14,
v1-0001-Call-pg_output_begin-in-pg_decode_message-if-it-i_PG13.patch
patch can be applied on PG13, PG12 and PG11.
Thoughts?

Regards,
Vignesh
From 2df5e87ec7c82daa5f17da50f770f923eb7765b4 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 28 Jun 2023 14:01:22 +0530
Subject: [PATCH] Call pg_output_begin in pg_decode_message if it is the first
 change in the transaction.

Call pg_output_begin in pg_decode_message if it is the first change in
the transaction.
---
 contrib/test_decoding/expected/messages.out | 10 ++++++++--
 contrib/test_decoding/sql/messages.sql      |  2 +-
 contrib/test_decoding/test_decoding.c       | 12 ++++++++++++
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index c75d40190b..0fd70036bd 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
  ignorethis
 (1 row)
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
                                 data                                
 --------------------------------------------------------------------
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg1
+ COMMIT
  message: transactional: 0 prefix: test, sz: 4 content:msg2
  message: transactional: 0 prefix: test, sz: 4 content:msg4
  message: transactional: 0 prefix: test, sz: 4 content:msg6
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg5
  message: transactional: 1 prefix: test, sz: 4 content:msg7
+ COMMIT
+ BEGIN
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
-(7 rows)
+ COMMIT
+(13 rows)
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f7738e5..3d8500f99c 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -19,7 +19,7 @@ COMMIT;
 
 SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 12d1d0505d..2bbac5f6a8 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -743,6 +743,18 @@ pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 				  const char *prefix, Size sz, const char *message)
 {
+	TestDecodingData *data = ctx->output_plugin_private;
+	TestDecodingTxnData *txndata;
+
+	txndata = transactional ? txn->output_plugin_private : NULL;
+
+	/* output BEGIN if we haven't yet */
+	if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
+		pg_output_begin(ctx, data, txn, false);
+
+	if (transactional)
+		txndata->xact_wrote_changes = true;
+
 	OutputPluginPrepareWrite(ctx, true);
 	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
 					 transactional, prefix, sz);
-- 
2.34.1

From 273776c53da22ecd69f8b7d4e7712ef66ab0ea5c Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 28 Jun 2023 14:01:22 +0530
Subject: [PATCH] Call pg_output_begin in pg_decode_message if it is the first
 change in the transaction.

Call pg_output_begin in pg_decode_message if it is the first change in
the transaction.
---
 contrib/test_decoding/expected/messages.out | 10 ++++++++--
 contrib/test_decoding/sql/messages.sql      |  2 +-
 contrib/test_decoding/test_decoding.c       |  9 +++++++++
 3 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index c75d40190b..0fd70036bd 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
  ignorethis
 (1 row)
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
                                 data                                
 --------------------------------------------------------------------
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg1
+ COMMIT
  message: transactional: 0 prefix: test, sz: 4 content:msg2
  message: transactional: 0 prefix: test, sz: 4 content:msg4
  message: transactional: 0 prefix: test, sz: 4 content:msg6
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg5
  message: transactional: 1 prefix: test, sz: 4 content:msg7
+ COMMIT
+ BEGIN
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
-(7 rows)
+ COMMIT
+(13 rows)
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f7738e5..3d8500f99c 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -19,7 +19,7 @@ COMMIT;
 
 SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
 
 -- test db filtering
 \set prevdb :DBNAME
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 93c948856e..d54b8a7e77 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -534,6 +534,15 @@ pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
 				  const char *prefix, Size sz, const char *message)
 {
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* output BEGIN if we haven't yet */
+	if (transactional && data->skip_empty_xacts && !data->xact_wrote_changes)
+		pg_output_begin(ctx, data, txn, false);
+
+	if (transactional)
+		data->xact_wrote_changes = true;
+
 	OutputPluginPrepareWrite(ctx, true);
 	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
 					 transactional, prefix, sz);
-- 
2.34.1

Reply via email to