On Fri, Feb 2, 2024 at 12:48 AM vignesh C <vignes...@gmail.com> wrote:
>
> On Tue, 3 Oct 2023 at 15:54, vignesh C <vignes...@gmail.com> wrote:
> >
> > On Mon, 3 Jul 2023 at 07:16, Masahiko Sawada <sawada.m...@gmail.com> wrote:
> > >
> > > On Fri, Jun 23, 2023 at 12:39 PM Dilip Kumar <dilipbal...@gmail.com> 
> > > wrote:
> > > >
> > > > On Fri, Jun 9, 2023 at 10:47 AM Masahiko Sawada <sawada.m...@gmail.com> 
> > > > wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > In logical decoding, we don't need to collect decoded changes of
> > > > > aborted transactions. While streaming changes, we can detect
> > > > > concurrent abort of the (sub)transaction but there is no mechanism to
> > > > > skip decoding changes of transactions that are known to already be
> > > > > aborted. With the attached WIP patch, we check CLOG when decoding the
> > > > > transaction for the first time. If it's already known to be aborted,
> > > > > we skip collecting decoded changes of such transactions. That way,
> > > > > when the logical replication is behind or restarts, we don't need to
> > > > > decode large transactions that already aborted, which helps improve
> > > > > the decoding performance.
> > > > >
> > > > +1 for the idea of checking the transaction status only when we need
> > > > to flush it to the disk or send it downstream (if streaming in
> > > > progress is enabled).   Although this check is costly since we are
> > > > planning only for large transactions then it is worth it if we can
> > > > occasionally avoid disk or network I/O for the aborted transactions.
> > > >
> > >
> > > Thanks.
> > >
> > > I've attached the updated patch. With this patch, we check the
> > > transaction status for only large-transactions when eviction. For
> > > regression test purposes, I disable this transaction status check when
> > > logical_replication_mode is set to 'immediate'.
> >
> > May be there is some changes that are missing in the patch, which is
> > giving the following errors:
> > reorderbuffer.c: In function ‘ReorderBufferCheckTXNAbort’:
> > reorderbuffer.c:3584:22: error: ‘logical_replication_mode’ undeclared
> > (first use in this function)
> >  3584 |         if (unlikely(logical_replication_mode ==
> > LOGICAL_REP_MODE_IMMEDIATE))
> >       |                      ^~~~~~~~~~~~~~~~~~~~~~~~
>
> With no update to the thread and the compilation still failing I'm
> marking this as returned with feedback.  Please feel free to resubmit
> to the next CF when there is a new version of the patch.
>

I resumed working on this item. I've attached the new version patch.

I rebased the patch to the current HEAD and updated comments and
commit messages. The patch is straightforward and I'm somewhat
satisfied with it, but I'm thinking of adding some tests for it.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
From c5c78f5d53d375f7a79b2561c551f7bb3ff57717 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 3 Jul 2023 10:28:00 +0900
Subject: [PATCH v3] Skip logical decoding of already-aborted transactions.

If we detect a concurrent abort of a streaming transaction, we discard
all changes and skip decoding further changes of the transaction. This
commit introduces a new check if a (streaming or non-streaming)
transaction is already aborted by CLOG lookup, enabling us to skip
decoding further changes of the transaction. This helps a lot in
logical decoding performance in a case where the transaction is large
and already rolled back since we can save disk or network I/O.

We do this new check for only large-transactions when eviction since
checking CLOG is costly and could cause a slowdown with lots of small
transactions, where most transactions commit.

Reviewed-by:
Discussion: https://postgr.es/m/CAD21AoDht9Pz_DFv_R2LqBTBbO4eGrpa9Vojmt5z5sEx3XwD7A@mail.gmail.com
---
 .../replication/logical/reorderbuffer.c       | 98 ++++++++++++++++---
 src/include/replication/reorderbuffer.h       | 13 ++-
 2 files changed, 94 insertions(+), 17 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index bbf0966182..f3284708bf 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -100,6 +100,7 @@
 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/procarray.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
@@ -256,7 +257,7 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *data);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
-									 bool txn_prepared);
+									 bool txn_prepared, bool streaming);
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
 										TransactionId xid, XLogSegNo segno);
@@ -777,11 +778,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
 	/*
-	 * While streaming the previous changes we have detected that the
-	 * transaction is aborted.  So there is no point in collecting further
-	 * changes for it.
+	 * If we have detected that the transaction is aborted while streaming the
+	 * previous changes or by checking its CLOG, there is no point in
+	 * collecting further changes for it.
 	 */
-	if (txn->concurrent_abort)
+	if (txn->aborted)
 	{
 		/*
 		 * We don't need to update memory accounting for this change as we
@@ -1600,9 +1601,12 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
  *
  * 'txn_prepared' indicates that we have decoded the transaction at prepare
  * time.
+ *
+ * 'streaming_txn' indicates that the given transaction is a streaming transaction.
  */
 static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared,
+						 bool streaming_txn)
 {
 	dlist_mutable_iter iter;
 
@@ -1621,7 +1625,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		Assert(rbtxn_is_known_subxact(subtxn));
 		Assert(subtxn->nsubtxns == 0);
 
-		ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
+		ReorderBufferTruncateTXN(rb, subtxn, txn_prepared, streaming_txn);
 	}
 
 	/* cleanup changes in the txn */
@@ -1655,7 +1659,8 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 	 * about the toplevel xact (we send the XID in all messages), but we never
 	 * stream XIDs of empty subxacts.
 	 */
-	if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
+	if (streaming_txn && (!txn_prepared) &&
+		(rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
 		txn->txn_flags |= RBTXN_IS_STREAMED;
 
 	if (txn_prepared)
@@ -1884,7 +1889,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		 * full cleanup will happen as part of the COMMIT PREPAREDs, so now
 		 * just truncate txn by removing changes and tuplecids.
 		 */
-		ReorderBufferTruncateTXN(rb, txn, true);
+		ReorderBufferTruncateTXN(rb, txn, true, true);
 		/* Reset the CheckXidAlive */
 		CheckXidAlive = InvalidTransactionId;
 	}
@@ -2027,7 +2032,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					  ReorderBufferChange *specinsert)
 {
 	/* Discard the changes that we just streamed */
-	ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+	ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), true);
 
 	/* Free all resources allocated for toast reconstruction */
 	ReorderBufferToastReset(rb, txn);
@@ -2552,7 +2557,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 */
 		if (streaming || rbtxn_prepared(txn))
 		{
-			ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+			ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), streaming);
 			/* Reset the CheckXidAlive */
 			CheckXidAlive = InvalidTransactionId;
 		}
@@ -2605,7 +2610,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			FlushErrorState();
 			FreeErrorData(errdata);
 			errdata = NULL;
-			curtxn->concurrent_abort = true;
+			curtxn->aborted = true;
 
 			/* Reset the TXN so that it is allowed to stream remaining data. */
 			ReorderBufferResetTXN(rb, txn, snapshot_now,
@@ -2789,10 +2794,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 	 * when rollback prepared is decoded and sent, the downstream should be
 	 * able to rollback such a xact. See comments atop DecodePrepare.
 	 *
-	 * Note, for the concurrent_abort + streaming case a stream_prepare was
-	 * already sent within the ReorderBufferReplay call above.
+	 * Note, for the abort + streaming case a stream_prepare was already sent
+	 * within the ReorderBufferReplay call above.
 	 */
-	if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
+	if (txn->aborted && !rbtxn_is_streamed(txn))
 		rb->prepare(rb, txn, txn->final_lsn);
 }
 
@@ -3558,6 +3563,63 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
 	return largest;
 }
 
+/*
+ * Check the transaction status of the given transaction. If the transaction
+ * already aborted, we discards all changes accumulated so far and ignore
+ * future changes, and return true. Otherwise return false.
+ *
+ * If logical_replication_mode is set to "immediate", we disable this check
+ * for regression tests.
+ */
+static bool
+ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	/*
+	 * If logical_replication_mode is "immediate", we don't check the
+	 * transaction status so the caller always process this transaction.
+	 */
+	if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE)
+		return false;
+
+	/* Quick return if we've already knew the transaction status */
+	if (txn->aborted)
+		return true;
+
+	if (txn->committed)
+		return false;
+
+	/* Check the transaction status using CLOG lookup */
+	if (TransactionIdIsInProgress(txn->xid))
+		return false;
+
+	if (TransactionIdDidCommit(txn->xid))
+	{
+		/*
+		 * Remember the transaction is committed so that we can skip CLOG
+		 * check next time, avoiding the pressure on CLOG lookup.
+		 */
+		txn->committed = true;
+		return false;
+	}
+
+	/*
+	 * The transaction aborted. We discard the changes we've collected so far,
+	 * and free all resources allocated for toast reconstruction. The full
+	 * cleanup will happen as part of decoding ABORT record of this
+	 * transaction.
+	 */
+	ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), false);
+	ReorderBufferToastReset(rb, txn);
+
+	/*
+	 * Mark the transaction as aborted so we ignore future changes of this
+	 * transaction.
+	 */
+	txn->aborted = true;
+
+	return true;
+}
+
 /*
  * Check whether the logical_decoding_work_mem limit was reached, and if yes
  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
@@ -3610,6 +3672,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 			Assert(txn->total_size > 0);
 			Assert(rb->size >= txn->total_size);
 
+			if (ReorderBufferCheckTXNAbort(rb, txn))
+				continue;
+
 			ReorderBufferStreamTXN(rb, txn);
 		}
 		else
@@ -3625,6 +3690,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 			Assert(txn->size > 0);
 			Assert(rb->size >= txn->size);
 
+			if (ReorderBufferCheckTXNAbort(rb, txn))
+				continue;
+
 			ReorderBufferSerializeTXN(rb, txn);
 		}
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa..fe7874bc10 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -409,8 +409,17 @@ typedef struct ReorderBufferTXN
 	/* Size of top-transaction including sub-transactions. */
 	Size		total_size;
 
-	/* If we have detected concurrent abort then ignore future changes. */
-	bool		concurrent_abort;
+	/*
+	 * True if the transaction committed. Then we skip transaction status
+	 * check for this transaction.
+	 */
+	bool		committed;
+
+	/*
+	 * True if the transaction (concurrently) aborted. Then we ignore
+	 * future changes.
+	 */
+	bool		aborted;
 
 	/*
 	 * Private data pointer of the output plugin.
-- 
2.39.3

Reply via email to