On Wed, Apr 28, 2021 at 8:59 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Tue, Apr 27, 2021 at 11:02 AM vignesh C <vignes...@gmail.com> wrote:
> >
> > On Tue, Apr 27, 2021 at 9:48 AM vignesh C <vignes...@gmail.com> wrote:
> > >
> >
> > Attached patch has the changes to update statistics during
> > spill/stream which prevents the statistics from being lost during
> > interrupt.
> >
>
>  void
> -UpdateDecodingStats(LogicalDecodingContext *ctx)
> +UpdateDecodingStats(ReorderBuffer *rb)
>
> I don't think you need to change this interface because
> reorderbuffer->private_data points to LogicalDecodingContext. See
> StartupDecodingContext. Other than that there is a comment in the code
> "Update the decoding stats at transaction prepare/commit/abort...".
> This patch should extend that comment by saying something like
> "Additionally we send the stats when we spill or stream the changes to
> avoid losing them in case the decoding is interrupted."

Thanks for the comments, Please find the attached v4 patch having the
fixes for the same.

Regards,
Vignesh
From 533c45c4cbd4545350d464bbf7a2df91ee668a75 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Tue, 27 Apr 2021 10:56:02 +0530
Subject: [PATCH v4] Update replication statistics after every stream/spill.

Currently, replication slot statistics are updated at prepare, commit, and
rollback. Now, if the transaction is interrupted the stats might not get
updated. Fixed this by updating replication statistics after every
stream/spill.
---
 src/backend/replication/logical/decode.c        | 14 ++++++++------
 src/backend/replication/logical/reorderbuffer.c |  6 ++++++
 2 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7924581cdc..888e064ec0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -746,9 +746,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	}
 
 	/*
-	 * Update the decoding stats at transaction prepare/commit/abort. It is
-	 * not clear that sending more or less frequently than this would be
-	 * better.
+	 * Update the decoding stats at transaction prepare/commit/abort.
+	 * Additionally we send the stats when we spill or stream the changes to
+	 * avoid losing them in case the decoding is interrupted. It is not clear
+	 * that sending more or less frequently than this would be better.
 	 */
 	UpdateDecodingStats(ctx);
 }
@@ -828,9 +829,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
 
 	/*
-	 * Update the decoding stats at transaction prepare/commit/abort. It is
-	 * not clear that sending more or less frequently than this would be
-	 * better.
+	 * Update the decoding stats at transaction prepare/commit/abort.
+	 * Additionally we send the stats when we spill or stream the changes to
+	 * avoid losing them in case the decoding is interrupted. It is not clear
+	 * that sending more or less frequently than this would be better.
 	 */
 	UpdateDecodingStats(ctx);
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c27f710053..ceb83bcbf9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -3551,6 +3551,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		/* don't consider already serialized transactions */
 		rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+
+		/* update the decoding stats */
+		UpdateDecodingStats(rb->private_data);
 	}
 
 	Assert(spilled == txn->nentries_mem);
@@ -3920,6 +3923,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	/* Don't consider already streamed transaction. */
 	rb->streamTxns += (txn_is_streamed) ? 0 : 1;
 
+	/* update the decoding stats */
+	UpdateDecodingStats(rb->private_data);
+
 	Assert(dlist_is_empty(&txn->changes));
 	Assert(txn->nentries == 0);
 	Assert(txn->nentries_mem == 0);
-- 
2.25.1

Reply via email to