On Mon, 26 Jun 2023 at 15:51, Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Mon, Jun 26, 2023 at 3:07 PM Ashutosh Bapat > <ashutosh.bapat....@gmail.com> wrote: > > > > Hi All, > > Every pg_decode routine except pg_decode_message that decodes a > > transactional change, has following block > > /* output BEGIN if we haven't yet */ > > if (data->skip_empty_xacts && !txndata->xact_wrote_changes) > > { > > pg_output_begin(ctx, data, txn, false); > > } > > txndata->xact_wrote_changes = true; > > > > But pg_decode_message() doesn't call pg_output_begin(). If a WAL > > message is the first change in the transaction, it won't have a BEGIN > > before it. That looks like a bug. Why is pg_decode_message() > > exception? > > > > I can't see a reason why we shouldn't have a similar check for > transactional messages. So, agreed this is a bug.
Here is a patch having the fix for the same. I have not added any tests as the existing tests cover this scenario. The same issue is present in back branches too. v1-0001-Call-pg_output_begin-in-pg_decode_message-if-it-i_master.patch can be applied on master, PG15 and PG14, v1-0001-Call-pg_output_begin-in-pg_decode_message-if-it-i_PG13.patch patch can be applied on PG13, PG12 and PG11. Thoughts? Regards, Vignesh
From 2df5e87ec7c82daa5f17da50f770f923eb7765b4 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Wed, 28 Jun 2023 14:01:22 +0530 Subject: [PATCH] Call pg_output_begin in pg_decode_message if it is the first change in the transaction. Call pg_output_begin in pg_decode_message if it is the first change in the transaction. --- contrib/test_decoding/expected/messages.out | 10 ++++++++-- contrib/test_decoding/sql/messages.sql | 2 +- contrib/test_decoding/test_decoding.c | 12 ++++++++++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out index c75d40190b..0fd70036bd 100644 --- a/contrib/test_decoding/expected/messages.out +++ b/contrib/test_decoding/expected/messages.out @@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); ignorethis (1 row) -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); data -------------------------------------------------------------------- + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg1 + COMMIT message: transactional: 0 prefix: test, sz: 4 content:msg2 message: transactional: 0 prefix: test, sz: 4 content:msg4 message: transactional: 0 prefix: test, sz: 4 content:msg6 + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg5 message: transactional: 1 prefix: test, sz: 4 content:msg7 + COMMIT + BEGIN message: transactional: 1 prefix: test, sz: 11 content:czechtastic -(7 rows) + COMMIT +(13 rows) -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index cf3f7738e5..3d8500f99c 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -19,7 +19,7 @@ COMMIT; SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 12d1d0505d..2bbac5f6a8 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -743,6 +743,18 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message) { + TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata; + + txndata = transactional ? txn->output_plugin_private : NULL; + + /* output BEGIN if we haven't yet */ + if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes) + pg_output_begin(ctx, data, txn, false); + + if (transactional) + txndata->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", transactional, prefix, sz); -- 2.34.1
From 273776c53da22ecd69f8b7d4e7712ef66ab0ea5c Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Wed, 28 Jun 2023 14:01:22 +0530 Subject: [PATCH] Call pg_output_begin in pg_decode_message if it is the first change in the transaction. Call pg_output_begin in pg_decode_message if it is the first change in the transaction. --- contrib/test_decoding/expected/messages.out | 10 ++++++++-- contrib/test_decoding/sql/messages.sql | 2 +- contrib/test_decoding/test_decoding.c | 9 +++++++++ 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out index c75d40190b..0fd70036bd 100644 --- a/contrib/test_decoding/expected/messages.out +++ b/contrib/test_decoding/expected/messages.out @@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); ignorethis (1 row) -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); data -------------------------------------------------------------------- + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg1 + COMMIT message: transactional: 0 prefix: test, sz: 4 content:msg2 message: transactional: 0 prefix: test, sz: 4 content:msg4 message: transactional: 0 prefix: test, sz: 4 content:msg6 + BEGIN message: transactional: 1 prefix: test, sz: 4 content:msg5 message: transactional: 1 prefix: test, sz: 4 content:msg7 + COMMIT + BEGIN message: transactional: 1 prefix: test, sz: 11 content:czechtastic -(7 rows) + COMMIT +(13 rows) -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index cf3f7738e5..3d8500f99c 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -19,7 +19,7 @@ COMMIT; SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic'); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); -- test db filtering \set prevdb :DBNAME diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 93c948856e..d54b8a7e77 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -534,6 +534,15 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message) { + TestDecodingData *data = ctx->output_plugin_private; + + /* output BEGIN if we haven't yet */ + if (transactional && data->skip_empty_xacts && !data->xact_wrote_changes) + pg_output_begin(ctx, data, txn, false); + + if (transactional) + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", transactional, prefix, sz); -- 2.34.1