On Thu, Apr 29, 2021 at 3:06 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Wed, Apr 28, 2021 at 5:01 PM Amit Kapila <amit.kapil...@gmail.com>
wrote:
> >
> > On Wed, Apr 28, 2021 at 4:51 PM Masahiko Sawada <sawada.m...@gmail.com>
wrote:
> > >
> > > On Wed, Apr 28, 2021 at 6:39 PM Amit Kapila <amit.kapil...@gmail.com>
wrote:
> >
> > @@ -1369,7 +1369,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb,
> > ReorderBufferIterTXNState *state)
> >   * Update the total bytes processed before releasing the current set
> >   * of changes and restoring the new set of changes.
> >   */
> > - rb->totalBytes += rb->size;
> > + rb->totalBytes += entry->txn->total_size;
> >   if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
> >   &state->entries[off].segno))
> >
> > I have not tested this but won't in the above change you need to check
> > txn->toptxn for subtxns?
> >
>
> Now, I am able to reproduce this issue:
> Create table t1(c1 int);
> select pg_create_logical_replication_slot('s', 'test_decoding');
> Begin;
> insert into t1 values(1);
> savepoint s1;
> insert into t1 select generate_series(1, 100000);
> commit;
>
> postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL,
NULL);
>  count
> --------
>  100005
> (1 row)
>
> postgres=# select * from pg_stat_replication_slots;
>  slot_name | spill_txns | spill_count | spill_bytes | stream_txns |
> stream_count | stream_bytes | total_txns | total_bytes |
> stats_reset
>
-----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
>  s1        |          0 |           0 |           0 |           0 |
>         0 |            0 |          2 |    13200672 | 2021-04-29
> 14:33:55.156566+05:30
> (1 row)
>
> select * from pg_stat_reset_replication_slot('s1');
>
> Now reduce the logical decoding work mem to allow spilling.
> postgres=# set logical_decoding_work_mem='64kB';
> SET
> postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL,
NULL);
>  count
> --------
>  100005
> (1 row)
>
> postgres=# select * from pg_stat_replication_slots;
>  slot_name | spill_txns | spill_count | spill_bytes | stream_txns |
> stream_count | stream_bytes | total_txns | total_bytes |
> stats_reset
>
-----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
>  s1        |          1 |         202 |    13200000 |           0 |
>         0 |            0 |          2 |         672 | 2021-04-29
> 14:35:21.836613+05:30
> (1 row)
>
> You can notice that after we have allowed spilling the 'total_bytes'
> stats is showing a different value. The attached patch fixes the issue
> for me. Let me know what do you think about this?

I found one issue with the following scenario when testing with
logical_decoding_work_mem as 64kB:

BEGIN;
INSERT INTO t1 values(generate_series(1,10000));
SAVEPOINT s1;
INSERT INTO t1 values(generate_series(1,10000));
COMMIT;
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot1', NULL,
        NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
select * from pg_stat_replication_slots;
    slot_name     | spill_txns | spill_count | spill_bytes | stream_txns |
stream_count | stream_bytes | total_txns | total_bytes |
stats_reset
------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
 regression_slot1 |          6 |         154 |     9130176 |           0 |
           0 |            0 |          1 |     *4262016* | 2021-04-29
17:50:00.080663+05:30
(1 row)

Same thing works fine with logical_decoding_work_mem as 64MB:
select * from pg_stat_replication_slots;
   slot_name     | spill_txns | spill_count | spill_bytes | stream_txns |
stream_count | stream_bytes | total_txns | total_bytes |
stats_reset
------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
 regression_slot1 |          6 |         154 |     9130176 |           0 |
           0 |            0 |          1 |     *2640000* | 2021-04-29
17:50:00.080663+05:30
(1 row)

The patch required one change:
- rb->totalBytes += rb->size;
+ if (entry->txn->toptxn)
+ rb->totalBytes += entry->txn->toptxn->total_size;
+ else
+ rb->totalBytes += entry->txn->*total_size*;

The above should be changed to:
- rb->totalBytes += rb->size;
+ if (entry->txn->toptxn)
+ rb->totalBytes += entry->txn->toptxn->total_size;
+ else
+ rb->totalBytes += entry->txn->*size*;

Attached patch fixes the issue.
Thoughts?

Regards,
Vignesh
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c27f710053..cdf46a36af 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1369,7 +1369,10 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		 * Update the total bytes processed before releasing the current set
 		 * of changes and restoring the new set of changes.
 		 */
-		rb->totalBytes += rb->size;
+		if (entry->txn->toptxn)
+			rb->totalBytes += entry->txn->toptxn->total_size;
+		else
+			rb->totalBytes += entry->txn->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
 										&state->entries[off].segno))
 		{
@@ -2382,7 +2385,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		if (!rbtxn_is_streamed(txn))
 			rb->totalTxns++;
 
-		rb->totalBytes += rb->size;
+		rb->totalBytes += txn->total_size;
 
 		/*
 		 * Done with current changes, send the last message for this set of
@@ -3073,7 +3076,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 {
 	Size		sz;
 	ReorderBufferTXN *txn;
-	ReorderBufferTXN *toptxn = NULL;
+	ReorderBufferTXN *toptxn;
 
 	Assert(change->txn);
 
@@ -3087,14 +3090,14 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 	txn = change->txn;
 
-	/* If streaming supported, update the total size in top level as well. */
-	if (ReorderBufferCanStream(rb))
-	{
-		if (txn->toptxn != NULL)
-			toptxn = txn->toptxn;
-		else
-			toptxn = txn;
-	}
+	/*
+	 * Update the total size in top level as well. This is later used to
+	 * compute the decoding stats.
+	 */
+	if (txn->toptxn != NULL)
+		toptxn = txn->toptxn;
+	else
+		toptxn = txn;
 
 	sz = ReorderBufferChangeSize(change);
 
@@ -3104,8 +3107,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 		rb->size += sz;
 
 		/* Update the total size in the top transaction. */
-		if (toptxn)
-			toptxn->total_size += sz;
+		toptxn->total_size += sz;
 	}
 	else
 	{
@@ -3114,8 +3116,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 		rb->size -= sz;
 
 		/* Update the total size in the top transaction. */
-		if (toptxn)
-			toptxn->total_size -= sz;
+		toptxn->total_size -= sz;
 	}
 
 	Assert(txn->size <= rb->size);

Reply via email to