On Fri, Apr 30, 2021 at 5:55 AM Masahiko Sawada <sawada.m...@gmail.com> wrote:
>
> On Thu, Apr 29, 2021 at 9:44 PM vignesh C <vignes...@gmail.com> wrote:
> >
> >
> >
> > On Thu, Apr 29, 2021 at 3:06 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > > On Wed, Apr 28, 2021 at 5:01 PM Amit Kapila <amit.kapil...@gmail.com> 
> > > wrote:
> > > >
> > > > On Wed, Apr 28, 2021 at 4:51 PM Masahiko Sawada <sawada.m...@gmail.com> 
> > > > wrote:
> > > > >
> > > > > On Wed, Apr 28, 2021 at 6:39 PM Amit Kapila <amit.kapil...@gmail.com> 
> > > > > wrote:
> > > >
> > > > @@ -1369,7 +1369,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb,
> > > > ReorderBufferIterTXNState *state)
> > > >   * Update the total bytes processed before releasing the current set
> > > >   * of changes and restoring the new set of changes.
> > > >   */
> > > > - rb->totalBytes += rb->size;
> > > > + rb->totalBytes += entry->txn->total_size;
> > > >   if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
> > > >   &state->entries[off].segno))
> > > >
> > > > I have not tested this but won't in the above change you need to check
> > > > txn->toptxn for subtxns?
> > > >
> > >
> > > Now, I am able to reproduce this issue:
> > > Create table t1(c1 int);
> > > select pg_create_logical_replication_slot('s', 'test_decoding');
> > > Begin;
> > > insert into t1 values(1);
> > > savepoint s1;
> > > insert into t1 select generate_series(1, 100000);
> > > commit;
> > >
> > > postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL, 
> > > NULL);
> > >  count
> > > --------
> > >  100005
> > > (1 row)
> > >
> > > postgres=# select * from pg_stat_replication_slots;
> > >  slot_name | spill_txns | spill_count | spill_bytes | stream_txns |
> > > stream_count | stream_bytes | total_txns | total_bytes |
> > > stats_reset
> > > -----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
> > >  s1        |          0 |           0 |           0 |           0 |
> > >         0 |            0 |          2 |    13200672 | 2021-04-29
> > > 14:33:55.156566+05:30
> > > (1 row)
> > >
> > > select * from pg_stat_reset_replication_slot('s1');
> > >
> > > Now reduce the logical decoding work mem to allow spilling.
> > > postgres=# set logical_decoding_work_mem='64kB';
> > > SET
> > > postgres=# select count(*) from pg_logical_slot_peek_changes('s1', NULL, 
> > > NULL);
> > >  count
> > > --------
> > >  100005
> > > (1 row)
> > >
> > > postgres=# select * from pg_stat_replication_slots;
> > >  slot_name | spill_txns | spill_count | spill_bytes | stream_txns |
> > > stream_count | stream_bytes | total_txns | total_bytes |
> > > stats_reset
> > > -----------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
> > >  s1        |          1 |         202 |    13200000 |           0 |
> > >         0 |            0 |          2 |         672 | 2021-04-29
> > > 14:35:21.836613+05:30
> > > (1 row)
> > >
> > > You can notice that after we have allowed spilling the 'total_bytes'
> > > stats is showing a different value. The attached patch fixes the issue
> > > for me. Let me know what do you think about this?
> >
> > I found one issue with the following scenario when testing with 
> > logical_decoding_work_mem as 64kB:
> >
> > BEGIN;
> > INSERT INTO t1 values(generate_series(1,10000));
> > SAVEPOINT s1;
> > INSERT INTO t1 values(generate_series(1,10000));
> > COMMIT;
> > SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot1', NULL,
> >         NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
> > select * from pg_stat_replication_slots;
> >     slot_name     | spill_txns | spill_count | spill_bytes | stream_txns | 
> > stream_count | stream_bytes | total_txns | total_bytes |           
> > stats_reset
> > ------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
> >  regression_slot1 |          6 |         154 |     9130176 |           0 |  
> >           0 |            0 |          1 |     4262016 | 2021-04-29 
> > 17:50:00.080663+05:30
> > (1 row)
> >
> > Same thing works fine with logical_decoding_work_mem as 64MB:
> > select * from pg_stat_replication_slots;
> >    slot_name     | spill_txns | spill_count | spill_bytes | stream_txns | 
> > stream_count | stream_bytes | total_txns | total_bytes |           
> > stats_reset
> > ------------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+----------------------------------
> >  regression_slot1 |          6 |         154 |     9130176 |           0 |  
> >           0 |            0 |          1 |     2640000 | 2021-04-29 
> > 17:50:00.080663+05:30
> > (1 row)
> >
> > The patch required one change:
> > - rb->totalBytes += rb->size;
> > + if (entry->txn->toptxn)
> > + rb->totalBytes += entry->txn->toptxn->total_size;
> > + else
> > + rb->totalBytes += entry->txn->total_size;
> >
> > The above should be changed to:
> > - rb->totalBytes += rb->size;
> > + if (entry->txn->toptxn)
> > + rb->totalBytes += entry->txn->toptxn->total_size;
> > + else
> > + rb->totalBytes += entry->txn->size;
> >
> > Attached patch fixes the issue.
> > Thoughts?
>
> After more thought, it seems to me that we should use txn->size here
> regardless of the top transaction or subtransaction since we're
> iterating changes associated with a transaction that is either the top
> transaction or a subtransaction. Otherwise, I think if some
> subtransactions are not serialized, we will end up adding bytes
> including those subtransactions during iterating other serialized
> subtransactions. Whereas in ReorderBufferProcessTXN() we should use
> txn->total_txn since txn is always the top transaction. I've attached
> another patch to do this.
>
> BTW, to check how many bytes of changes are passed to the decoder
> plugin I wrote and attached a simple decoder plugin that calculates
> the total amount of bytes for each change on the decoding plugin side.
> I think what we expect is that the amounts of change bytes shown on
> both sides are matched. You can build it in the same way as other
> third-party modules and need to create decoder_stats extension.
>
> The basic usage is to execute pg_logical_slot_get/peek_changes() and
> mystats('slot_name') in the same process. During decoding the changes,
> decoder_stats plugin accumulates the change bytes in the local memory
> and mystats() SQL function, defined in decoder_stats extension, shows
> those stats.
>
> I've done some test with v4 patch. For instance, with the following
> workload the output is expected:
>
> BEGIN;
> INSERT INTO t1 values(generate_series(1,10000));
> SAVEPOINT s1;
> INSERT INTO t1 values(generate_series(1,10000));
> COMMIT;
>
> mystats() functions shows:
>
> =# select pg_logical_slot_get_changes('test_slot', null, null);
> =# select change_type, change_bytes, total_bytes from mystats('test_slot');
>  change_type | change_bytes | total_bytes
> -------------+--------------+-------------
>  INSERT      | 2578 kB      | 2578 kB
> (1 row)
>
> 'change_bytes' and 'total_bytes' are the total amount of changes
> calculated on the plugin side and core side, respectively. Those are
> matched, which is expected. On the other hand, with the following
> workload those are not matched:
>
> BEGIN;
> INSERT INTO t1 values(generate_series(1,10000));
> SAVEPOINT s1;
> INSERT INTO t1 values(generate_series(1,10000));
> SAVEPOINT s2;
> INSERT INTO t1 values(generate_series(1,10000));
> COMMIT;
>
> =# select pg_logical_slot_get_changes('test_slot', null, null);
> =# select change_type, change_bytes, total_bytes from mystats('test_slot');
>  change_type | change_bytes | total_bytes
> -------------+--------------+-------------
>  INSERT      | 3867 kB      | 5451 kB
> (1 row)
>
> This is fixed by the attached v5 patch.

The changes look good to me, I don't have any comments.

Regards,
Vignesh


Reply via email to