From 409d1f318a5c65785e81d9649955cdbab751766e Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Fri, 30 Apr 2021 10:12:39 +0530
Subject: [PATCH v6] Fix the computation of slot stats for 'total_bytes'.

Previously, we were using the size of all the changes present in
ReorderBuffer to compute total_bytes after decoding a transaction and that
can lead to counting some of the transactions' changes more than once. Fix
it by using the size of the changes decoded for a transaction to compute
'total_bytes'.

Author: Sawada Masahiko
Reviewed-by: Vignesh C, Amit Kapila
Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
---
 .../replication/logical/reorderbuffer.c       | 39 +++++++++----------
 1 file changed, 19 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c27f7100534..7caf08f39d4 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1366,10 +1366,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		dlist_push_tail(&state->old_change, &change->node);
 
 		/*
-		 * Update the total bytes processed before releasing the current set
-		 * of changes and restoring the new set of changes.
+		 * Update the total bytes processed by the txn for which we are
+		 * releasing the current set of changes and restoring the new set of
+		 * changes.
 		 */
-		rb->totalBytes += rb->size;
+		rb->totalBytes += entry->txn->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
 										&state->entries[off].segno))
 		{
@@ -2371,9 +2372,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		iterstate = NULL;
 
 		/*
-		 * Update total transaction count and total transaction bytes
-		 * processed. Ensure to not count the streamed transaction multiple
-		 * times.
+		 * Update total transaction count and total bytes processed by the
+		 * transaction and its subtransactions. Ensure to not count the
+		 * streamed transaction multiple times.
 		 *
 		 * Note that the statistics computation has to be done after
 		 * ReorderBufferIterTXNFinish as it releases the serialized change
@@ -2382,7 +2383,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		if (!rbtxn_is_streamed(txn))
 			rb->totalTxns++;
 
-		rb->totalBytes += rb->size;
+		rb->totalBytes += txn->total_size;
 
 		/*
 		 * Done with current changes, send the last message for this set of
@@ -3073,7 +3074,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 {
 	Size		sz;
 	ReorderBufferTXN *txn;
-	ReorderBufferTXN *toptxn = NULL;
+	ReorderBufferTXN *toptxn;
 
 	Assert(change->txn);
 
@@ -3087,14 +3088,14 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 	txn = change->txn;
 
-	/* If streaming supported, update the total size in top level as well. */
-	if (ReorderBufferCanStream(rb))
-	{
-		if (txn->toptxn != NULL)
-			toptxn = txn->toptxn;
-		else
-			toptxn = txn;
-	}
+	/*
+	 * Update the total size in top level as well. This is later used to
+	 * compute the decoding stats.
+	 */
+	if (txn->toptxn != NULL)
+		toptxn = txn->toptxn;
+	else
+		toptxn = txn;
 
 	sz = ReorderBufferChangeSize(change);
 
@@ -3104,8 +3105,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 		rb->size += sz;
 
 		/* Update the total size in the top transaction. */
-		if (toptxn)
-			toptxn->total_size += sz;
+		toptxn->total_size += sz;
 	}
 	else
 	{
@@ -3114,8 +3114,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 		rb->size -= sz;
 
 		/* Update the total size in the top transaction. */
-		if (toptxn)
-			toptxn->total_size -= sz;
+		toptxn->total_size -= sz;
 	}
 
 	Assert(txn->size <= rb->size);
-- 
2.28.0.windows.1

