On Thu, Apr 29, 2021 at 3:06 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Wed, Apr 28, 2021 at 5:01 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > On Wed, Apr 28, 2021 at 4:51 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > > > > > On Wed, Apr 28, 2021 at 6:39 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > @@ -1369,7 +1369,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, > > ReorderBufferIterTXNState *state) > > * Update the total bytes processed before releasing the current set > > * of changes and restoring the new set of changes. > > */ > > - rb->totalBytes += rb->size; > > + rb->totalBytes += entry->txn->total_size; > > if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, > > &state->entries[off].segno)) > > > > I have not tested this but won't in the above change you need to check > > txn->toptxn for subtxns? > > > > Now, I am able to reproduce this issue: > Create table t1(c1 int); > select pg_create_logical_replication_slot('s', 'test_decoding'); > Begin; > insert into t1 values(1); > savepoint s1; > insert into t1 select generate_series(1, 100000); > commit; > > postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL, NULL); > count > -------- > 100005 > (1 row) > > postgres=# select * from pg_stat_replication_slots; > slot_name | spill_txns | spill_count | spill_bytes | stream_txns | > stream_count | stream_bytes | total_txns | total_bytes | > stats_reset > -----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+---------------------------------- > s1 | 0 | 0 | 0 | 0 | > 0 | 0 | 2 | 13200672 | 2021-04-29 > 14:33:55.156566+05:30 > (1 row) > > select * from pg_stat_reset_replication_slot('s1'); > > Now reduce the logical decoding work mem to allow spilling. > postgres=# set logical_decoding_work_mem='64kB'; > SET > postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL, NULL); > count > -------- > 100005 > (1 row) > > postgres=# select * from pg_stat_replication_slots; > slot_name | spill_txns | spill_count | spill_bytes | stream_txns | > stream_count | stream_bytes | total_txns | total_bytes | > stats_reset > -----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+---------------------------------- > s1 | 1 | 202 | 13200000 | 0 | > 0 | 0 | 2 | 672 | 2021-04-29 > 14:35:21.836613+05:30 > (1 row) > > You can notice that after we have allowed spilling the 'total_bytes' > stats is showing a different value. The attached patch fixes the issue > for me. Let me know what do you think about this?
I found one issue with the following scenario when testing with logical_decoding_work_mem as 64kB: BEGIN; INSERT INTO t1 values(generate_series(1,10000)); SAVEPOINT s1; INSERT INTO t1 values(generate_series(1,10000)); COMMIT; SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); select * from pg_stat_replication_slots; slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset ------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+---------------------------------- regression_slot1 | 6 | 154 | 9130176 | 0 | 0 | 0 | 1 | *4262016* | 2021-04-29 17:50:00.080663+05:30 (1 row) Same thing works fine with logical_decoding_work_mem as 64MB: select * from pg_stat_replication_slots; slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset ------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+---------------------------------- regression_slot1 | 6 | 154 | 9130176 | 0 | 0 | 0 | 1 | *2640000* | 2021-04-29 17:50:00.080663+05:30 (1 row) The patch required one change: - rb->totalBytes += rb->size; + if (entry->txn->toptxn) + rb->totalBytes += entry->txn->toptxn->total_size; + else + rb->totalBytes += entry->txn->*total_size*; The above should be changed to: - rb->totalBytes += rb->size; + if (entry->txn->toptxn) + rb->totalBytes += entry->txn->toptxn->total_size; + else + rb->totalBytes += entry->txn->*size*; Attached patch fixes the issue. Thoughts? Regards, Vignesh
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c27f710053..cdf46a36af 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1369,7 +1369,10 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) * Update the total bytes processed before releasing the current set * of changes and restoring the new set of changes. */ - rb->totalBytes += rb->size; + if (entry->txn->toptxn) + rb->totalBytes += entry->txn->toptxn->total_size; + else + rb->totalBytes += entry->txn->size; if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { @@ -2382,7 +2385,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, if (!rbtxn_is_streamed(txn)) rb->totalTxns++; - rb->totalBytes += rb->size; + rb->totalBytes += txn->total_size; /* * Done with current changes, send the last message for this set of @@ -3073,7 +3076,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { Size sz; ReorderBufferTXN *txn; - ReorderBufferTXN *toptxn = NULL; + ReorderBufferTXN *toptxn; Assert(change->txn); @@ -3087,14 +3090,14 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, txn = change->txn; - /* If streaming supported, update the total size in top level as well. */ - if (ReorderBufferCanStream(rb)) - { - if (txn->toptxn != NULL) - toptxn = txn->toptxn; - else - toptxn = txn; - } + /* + * Update the total size in top level as well. This is later used to + * compute the decoding stats. + */ + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; sz = ReorderBufferChangeSize(change); @@ -3104,8 +3107,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, rb->size += sz; /* Update the total size in the top transaction. */ - if (toptxn) - toptxn->total_size += sz; + toptxn->total_size += sz; } else { @@ -3114,8 +3116,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, rb->size -= sz; /* Update the total size in the top transaction. */ - if (toptxn) - toptxn->total_size -= sz; + toptxn->total_size -= sz; } Assert(txn->size <= rb->size);