Hi,
attached is a patch that I think is cleaning up the API between Postgres
and the logical decoding plugin. Up until now, not only transactions
rolled back, but also some committed transactions were filtered and not
presented to the output plugin. While it is documented that aborted
transactions are not decoded, the second exception has not been documented.
The difference is with committed empty transactions that have a snapshot
versus those that do not. I think that's arbitrary and propose to
remove this distinction, so that all committed transactions are decoded.
In the case of decoding a two-phase transaction, I argue that this is
even more important, as the gid potentially carries information.
Please consider the attached patch, which drops the mentioned filter.
It also adjusts tests to show the difference and provides a minor
clarification to the documentation.
Regards
Markus
From: Markus Wanner <markus.wan...@enterprisedb.com>
Date: Fri, 19 Feb 2021 13:34:22 +0100
Subject: Present all committed transactions to the output plugin
Drop the filtering of some empty transactions and more
consistently present all committed transactions to the output
plugin. Changes behavior for empty transactions that do not even
have a base_snapshot.
While clearly not making any substantial changes to the database,
an empty transactions still increments the local TransactionId
and possibly carries a gid (in the case of two-phase
transactions).
Adjust the test_decoder to show the difference between empty
transactions that do not carry a snapshot and those that
do. Adjust tests to cover decoding of committed empty
transactions with and without snapshots.
Add a minor clarification in the docs.
---
.../replication/logical/reorderbuffer.c | 41 ++++++++-----------
doc/src/sgml/logicaldecoding.sgml | 10 +++--
contrib/test_decoding/expected/mxact.out | 2 +-
.../test_decoding/expected/ondisk_startup.out | 2 +-
contrib/test_decoding/expected/prepared.out | 33 +++++++++++++--
contrib/test_decoding/expected/xact.out | 39 ++++++++++++++++++
contrib/test_decoding/sql/prepared.sql | 23 +++++++++--
contrib/test_decoding/sql/xact.sql | 21 ++++++++++
contrib/test_decoding/test_decoding.c | 12 ++++++
9 files changed, 147 insertions(+), 36 deletions(-)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5a62ab8bbc1..1e9ed81c3c1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2016,7 +2016,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferBuildTupleCidHash(rb, txn);
/* setup the initial snapshot */
- SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
+ if (snapshot_now)
+ SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
/*
* Decoding needs access to syscaches et al., which in turn use
@@ -2289,6 +2290,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
+ Assert(snapshot_now);
+
/* get rid of the old */
TeardownHistoricSnapshot(false);
@@ -2321,6 +2324,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
+ Assert(snapshot_now);
Assert(change->data.command_id != InvalidCommandId);
if (command_id < change->data.command_id)
@@ -2397,8 +2401,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* streaming mode.
*/
if (streaming)
+ {
+ Assert(snapshot_now);
ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
- else if (snapshot_now->copied)
+ }
+ else if (snapshot_now && snapshot_now->copied)
ReorderBufferFreeSnap(rb, snapshot_now);
/* cleanup */
@@ -2520,7 +2527,6 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn)
{
- Snapshot snapshot_now;
CommandId command_id = FirstCommandId;
txn->final_lsn = commit_lsn;
@@ -2544,28 +2550,15 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
}
/*
- * If this transaction has no snapshot, it didn't make any changes to the
- * database, so there's nothing to decode. Note that
- * ReorderBufferCommitChild will have transferred any snapshots from
- * subtransactions if there were any.
+ * Process and send the changes to output plugin.
+ *
+ * Note that for empty transactions, txn->base_snapshot may well be
+ * NULL. The corresponding callbacks will still be invoked, as even an
+ * empty transaction carries information (LSN increments, the gid in
+ * case of a two-phase transaction). This is unlike versions prior to
+ * 14 which optimized away transactions without a snapshot.
*/
- if (txn->base_snapshot == NULL)
- {
- Assert(txn->ninvalidations == 0);
-
- /*
- * Removing this txn before a commit might result in the computation
- * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
- */
- if (!rbtxn_prepared(txn))
- ReorderBufferCleanupTXN(rb, txn);
- return;
- }
-
- snapshot_now = txn->base_snapshot;
-
- /* Process and send the changes to output plugin. */
- ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
+ ReorderBufferProcessTXN(rb, txn, commit_lsn, txn->base_snapshot,
command_id, false);
}
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index cf705ed9cda..832acee8270 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -540,9 +540,8 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
Concurrent transactions are decoded in commit order, and only changes
belonging to a specific transaction are decoded between
the <literal>begin</literal> and <literal>commit</literal>
- callbacks. Transactions that were rolled back explicitly or implicitly
- never get
- decoded. Successful savepoints are
+ callbacks. All transactions are presented to the output plugin, whether
+ they made changes or not. Successful savepoints are
folded into the transaction containing them in the order they were
executed within that transaction. A transaction that is prepared for
a two-phase commit using <command>PREPARE TRANSACTION</command> will
@@ -632,7 +631,10 @@ typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
</programlisting>
The <parameter>txn</parameter> parameter contains meta information about
the transaction, like the time stamp at which it has been committed and
- its XID.
+ its XID. A transaction without a snapshot in
+ <literal>txn->base_snapshot</literal> will certainly not contain any
+ changes. Presence of a snapshot does not guarantee that a change will
+ follow prior to the commit, though.
</para>
</sect3>
diff --git a/contrib/test_decoding/expected/mxact.out b/contrib/test_decoding/expected/mxact.out
index f0d96cc67d0..9a3e3f21bc7 100644
--- a/contrib/test_decoding/expected/mxact.out
+++ b/contrib/test_decoding/expected/mxact.out
@@ -55,7 +55,7 @@ step s0start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NUL
data
BEGIN
-COMMIT
+COMMIT (empty with snapshot)
BEGIN
table public.do_write: INSERT: id[integer]:1 ts[timestamp with time zone]:null
COMMIT
diff --git a/contrib/test_decoding/expected/ondisk_startup.out b/contrib/test_decoding/expected/ondisk_startup.out
index 586b03d75db..d390d90fd26 100644
--- a/contrib/test_decoding/expected/ondisk_startup.out
+++ b/contrib/test_decoding/expected/ondisk_startup.out
@@ -43,7 +43,7 @@ BEGIN
table public.do_write: INSERT: id[integer]:2 addedbys2[integer]:null
COMMIT
BEGIN
-COMMIT
+COMMIT (empty with snapshot)
BEGIN
table public.do_write: INSERT: id[integer]:3 addedbys2[integer]:null addedbys1[integer]:null
COMMIT
diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d4ffa..6656d132069 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -35,9 +35,6 @@ COMMIT PREPARED 'test_prepared#3';
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (8);
INSERT INTO test_prepared2 VALUES (9);
--- cleanup
-DROP TABLE test_prepared1;
-DROP TABLE test_prepared2;
-- show results
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
@@ -66,6 +63,36 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
COMMIT
(22 rows)
+-- test decoding of an empty two-phase transaction
+BEGIN;
+SELECT 1 FROM test_prepared1 FOR UPDATE LIMIT 1;
+ ?column?
+----------
+ 1
+(1 row)
+
+PREPARE TRANSACTION 'test_prepared#4';
+COMMIT PREPARED 'test_prepared#4';
+-- test decoding of an empty transaction for which the reorderbuffer
+-- had to create a snapshot
+BEGIN;
+SAVEPOINT barf;
+INSERT INTO test_prepared1 VALUES (10);
+ROLLBACK TO SAVEPOINT barf;
+PREPARE TRANSACTION 'test_prepared#5';
+COMMIT PREPARED 'test_prepared#5';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+ data
+------------------------------
+ BEGIN (w/o snapshot)
+ COMMIT (empty w/o snapshot)
+ BEGIN
+ COMMIT (empty with snapshot)
+(4 rows)
+
+-- cleanup
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
diff --git a/contrib/test_decoding/expected/xact.out b/contrib/test_decoding/expected/xact.out
index ec4745005d7..3b50f8d073b 100644
--- a/contrib/test_decoding/expected/xact.out
+++ b/contrib/test_decoding/expected/xact.out
@@ -55,6 +55,45 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
COMMIT
(3 rows)
+-- test decoding of an aborted transaction
+BEGIN;
+INSERT INTO xact_test VALUES ('aborted-txn');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+ data
+------
+(0 rows)
+
+-- test decoding of an empty but not aborted transaction
+BEGIN;
+SELECT 1 FROM xact_test FOR UPDATE LIMIT 1;
+ ?column?
+----------
+ 1
+(1 row)
+
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+ data
+-----------------------------
+ BEGIN (w/o snapshot)
+ COMMIT (empty w/o snapshot)
+(2 rows)
+
+-- test decoding of an empty transaction for which the reorderbuffer
+-- had to create a snapshot
+BEGIN;
+SAVEPOINT barf;
+INSERT INTO xact_test VALUES ('change-to-roll-back');
+ROLLBACK TO SAVEPOINT barf;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+ data
+------------------------------
+ BEGIN
+ COMMIT (empty with snapshot)
+(2 rows)
+
DROP TABLE xact_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e72639767ee..dcb18ecc80d 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -40,11 +40,28 @@ COMMIT PREPARED 'test_prepared#3';
INSERT INTO test_prepared1 VALUES (8);
INSERT INTO test_prepared2 VALUES (9);
+-- show results
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test decoding of an empty two-phase transaction
+BEGIN;
+SELECT 1 FROM test_prepared1 FOR UPDATE LIMIT 1;
+PREPARE TRANSACTION 'test_prepared#4';
+COMMIT PREPARED 'test_prepared#4';
+
+-- test decoding of an empty transaction for which the reorderbuffer
+-- had to create a snapshot
+BEGIN;
+SAVEPOINT barf;
+INSERT INTO test_prepared1 VALUES (10);
+ROLLBACK TO SAVEPOINT barf;
+PREPARE TRANSACTION 'test_prepared#5';
+COMMIT PREPARED 'test_prepared#5';
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+
-- cleanup
DROP TABLE test_prepared1;
DROP TABLE test_prepared2;
--- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/xact.sql b/contrib/test_decoding/sql/xact.sql
index aa555911e86..419b26f9b38 100644
--- a/contrib/test_decoding/sql/xact.sql
+++ b/contrib/test_decoding/sql/xact.sql
@@ -28,6 +28,27 @@ COMMIT;
-- and now show those changes
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+-- test decoding of an aborted transaction
+BEGIN;
+INSERT INTO xact_test VALUES ('aborted-txn');
+ROLLBACK;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+
+-- test decoding of an empty but not aborted transaction
+BEGIN;
+SELECT 1 FROM xact_test FOR UPDATE LIMIT 1;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+
+-- test decoding of an empty transaction for which the reorderbuffer
+-- had to create a snapshot
+BEGIN;
+SAVEPOINT barf;
+INSERT INTO xact_test VALUES ('change-to-roll-back');
+ROLLBACK TO SAVEPOINT barf;
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0');
+
DROP TABLE xact_test;
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 929255eac74..c9dabaf60ae 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -324,6 +324,10 @@ pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBuff
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
else
appendStringInfoString(ctx->out, "BEGIN");
+
+ if (txn->base_snapshot == NULL)
+ appendStringInfo(ctx->out, " (w/o snapshot)");
+
OutputPluginWrite(ctx, last_write);
}
@@ -348,6 +352,14 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
else
appendStringInfoString(ctx->out, "COMMIT");
+ if (!xact_wrote_changes)
+ {
+ if (txn->base_snapshot == NULL)
+ appendStringInfo(ctx->out, " (empty w/o snapshot)");
+ else
+ appendStringInfo(ctx->out, " (empty with snapshot)");
+ }
+
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
timestamptz_to_str(txn->commit_time));
--
2.30.0