On Fri, Apr 30, 2021 at 5:55 AM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > On Thu, Apr 29, 2021 at 9:44 PM vignesh C <vignes...@gmail.com> wrote: > > > > > > > > On Thu, Apr 29, 2021 at 3:06 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > On Wed, Apr 28, 2021 at 5:01 PM Amit Kapila <amit.kapil...@gmail.com> > > > wrote: > > > > > > > > On Wed, Apr 28, 2021 at 4:51 PM Masahiko Sawada <sawada.m...@gmail.com> > > > > wrote: > > > > > > > > > > On Wed, Apr 28, 2021 at 6:39 PM Amit Kapila <amit.kapil...@gmail.com> > > > > > wrote: > > > > > > > > @@ -1369,7 +1369,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, > > > > ReorderBufferIterTXNState *state) > > > > * Update the total bytes processed before releasing the current set > > > > * of changes and restoring the new set of changes. > > > > */ > > > > - rb->totalBytes += rb->size; > > > > + rb->totalBytes += entry->txn->total_size; > > > > if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, > > > > &state->entries[off].segno)) > > > > > > > > I have not tested this but won't in the above change you need to check > > > > txn->toptxn for subtxns? > > > > > > > > > > Now, I am able to reproduce this issue: > > > Create table t1(c1 int); > > > select pg_create_logical_replication_slot('s', 'test_decoding'); > > > Begin; > > > insert into t1 values(1); > > > savepoint s1; > > > insert into t1 select generate_series(1, 100000); > > > commit; > > > > > > postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL, > > > NULL); > > > count > > > -------- > > > 100005 > > > (1 row) > > > > > > postgres=# select * from pg_stat_replication_slots; > > > slot_name | spill_txns | spill_count | spill_bytes | stream_txns | > > > stream_count | stream_bytes | total_txns | total_bytes | > > > stats_reset > > > -----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+---------------------------------- > > > s1 | 0 | 0 | 0 | 0 | > > > 0 | 0 | 2 | 13200672 | 2021-04-29 > > > 14:33:55.156566+05:30 > > > (1 row) > > > > > > select * from pg_stat_reset_replication_slot('s1'); > > > > > > Now reduce the logical decoding work mem to allow spilling. > > > postgres=# set logical_decoding_work_mem='64kB'; > > > SET > > > postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL, > > > NULL); > > > count > > > -------- > > > 100005 > > > (1 row) > > > > > > postgres=# select * from pg_stat_replication_slots; > > > slot_name | spill_txns | spill_count | spill_bytes | stream_txns | > > > stream_count | stream_bytes | total_txns | total_bytes | > > > stats_reset > > > -----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+---------------------------------- > > > s1 | 1 | 202 | 13200000 | 0 | > > > 0 | 0 | 2 | 672 | 2021-04-29 > > > 14:35:21.836613+05:30 > > > (1 row) > > > > > > You can notice that after we have allowed spilling the 'total_bytes' > > > stats is showing a different value. The attached patch fixes the issue > > > for me. Let me know what do you think about this? > > > > I found one issue with the following scenario when testing with > > logical_decoding_work_mem as 64kB: > > > > BEGIN; > > INSERT INTO t1 values(generate_series(1,10000)); > > SAVEPOINT s1; > > INSERT INTO t1 values(generate_series(1,10000)); > > COMMIT; > > SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot1', NULL, > > NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); > > select * from pg_stat_replication_slots; > > slot_name | spill_txns | spill_count | spill_bytes | stream_txns | > > stream_count | stream_bytes | total_txns | total_bytes | > > stats_reset > > ------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+---------------------------------- > > regression_slot1 | 6 | 154 | 9130176 | 0 | > > 0 | 0 | 1 | 4262016 | 2021-04-29 > > 17:50:00.080663+05:30 > > (1 row) > > > > Same thing works fine with logical_decoding_work_mem as 64MB: > > select * from pg_stat_replication_slots; > > slot_name | spill_txns | spill_count | spill_bytes | stream_txns | > > stream_count | stream_bytes | total_txns | total_bytes | > > stats_reset > > ------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+---------------------------------- > > regression_slot1 | 6 | 154 | 9130176 | 0 | > > 0 | 0 | 1 | 2640000 | 2021-04-29 > > 17:50:00.080663+05:30 > > (1 row) > > > > The patch required one change: > > - rb->totalBytes += rb->size; > > + if (entry->txn->toptxn) > > + rb->totalBytes += entry->txn->toptxn->total_size; > > + else > > + rb->totalBytes += entry->txn->total_size; > > > > The above should be changed to: > > - rb->totalBytes += rb->size; > > + if (entry->txn->toptxn) > > + rb->totalBytes += entry->txn->toptxn->total_size; > > + else > > + rb->totalBytes += entry->txn->size; > > > > Attached patch fixes the issue. > > Thoughts? > > After more thought, it seems to me that we should use txn->size here > regardless of the top transaction or subtransaction since we're > iterating changes associated with a transaction that is either the top > transaction or a subtransaction. Otherwise, I think if some > subtransactions are not serialized, we will end up adding bytes > including those subtransactions during iterating other serialized > subtransactions. Whereas in ReorderBufferProcessTXN() we should use > txn->total_txn since txn is always the top transaction. I've attached > another patch to do this. > > BTW, to check how many bytes of changes are passed to the decoder > plugin I wrote and attached a simple decoder plugin that calculates > the total amount of bytes for each change on the decoding plugin side. > I think what we expect is that the amounts of change bytes shown on > both sides are matched. You can build it in the same way as other > third-party modules and need to create decoder_stats extension. > > The basic usage is to execute pg_logical_slot_get/peek_changes() and > mystats('slot_name') in the same process. During decoding the changes, > decoder_stats plugin accumulates the change bytes in the local memory > and mystats() SQL function, defined in decoder_stats extension, shows > those stats. > > I've done some test with v4 patch. For instance, with the following > workload the output is expected: > > BEGIN; > INSERT INTO t1 values(generate_series(1,10000)); > SAVEPOINT s1; > INSERT INTO t1 values(generate_series(1,10000)); > COMMIT; > > mystats() functions shows: > > =# select pg_logical_slot_get_changes('test_slot', null, null); > =# select change_type, change_bytes, total_bytes from mystats('test_slot'); > change_type | change_bytes | total_bytes > -------------+--------------+------------- > INSERT | 2578 kB | 2578 kB > (1 row) > > 'change_bytes' and 'total_bytes' are the total amount of changes > calculated on the plugin side and core side, respectively. Those are > matched, which is expected. On the other hand, with the following > workload those are not matched: > > BEGIN; > INSERT INTO t1 values(generate_series(1,10000)); > SAVEPOINT s1; > INSERT INTO t1 values(generate_series(1,10000)); > SAVEPOINT s2; > INSERT INTO t1 values(generate_series(1,10000)); > COMMIT; > > =# select pg_logical_slot_get_changes('test_slot', null, null); > =# select change_type, change_bytes, total_bytes from mystats('test_slot'); > change_type | change_bytes | total_bytes > -------------+--------------+------------- > INSERT | 3867 kB | 5451 kB > (1 row) > > This is fixed by the attached v5 patch.
The changes look good to me, I don't have any comments. Regards, Vignesh