On Fri, Oct 3, 2025 at 9:26 AM Ashutosh Bapat
<[email protected]> wrote:
>
> On Fri, Oct 3, 2025 at 6:45 PM Bertrand Drouvot
> <[email protected]> wrote:
> >
> > Hi,
> >
> > On Fri, Oct 03, 2025 at 05:19:42PM +0530, Ashutosh Bapat wrote:
> > > + bool memory_limit_reached = (rb->size >= logical_decoding_work_mem *
> > > (Size) 1024);
> > > +
> > > + if (memory_limit_reached)
> > > + rb->memExceededCount += 1;
> >
> > Thanks for looking at it!
> >
> > > If the memory limit is hit but no transaction was serialized, the
> > > stats won't be updated since UpdateDecodingStats() won't be called. We
> > > need to call UpdateDecodingStats() in ReorderBufferCheckMemoryLimit()
> > > if no transaction was streamed or spilled.
> >
> > I did some testing and the stats are reported because UpdateDecodingStats()
> > is
> > also called in DecodeCommit(), DecodeAbort() and DecodePrepare() (in
> > addition
> > to ReorderBufferSerializeTXN() and ReorderBufferStreamTXN()). That's also
> > why
> > ,for example, total_txns is reported even if no transaction was streamed or
> > spilled.
>
> In a very pathological case, where all transactions happen to be
> aborted while decoding and yet memory limit is hit many times, nothing
> will be reported till first committed transaction after it is decoded.
> Which may never happen. I didn't find a call stack where by
> UpdateDecodingStats could be reached from
> ReorderBufferCheckAndTruncateAbortedTXN().
The more we report the status frequently, the less chances we lose the
statistics in case of logical decoding being interrupted but the more
overheads we have to update the statistics. I personally prefer not to
call UpdateDecodingStats() frequently since pgstat_report_replslot()
always flush the statistics. If the transaction is serialized or
streamed, we can update the memExceededCount together with other
statistics such as streamBytes and spillBytes. But if we can free
enough memory only by truncating already-aborted transactions, we need
to rely on the next committed/aborted/prepared transaction to update
the statistics. So how about calling UpdateDecodingStats() only in
case where we only truncate aborted transactions and the memory usage
gets lower than the limit?
I've attached the patch that implements this idea with a small
refactoring. It also has the change to the regression test results
we've discussed.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
diff --git a/contrib/test_decoding/expected/stats.out
b/contrib/test_decoding/expected/stats.out
index 72fbb270334..13d2be24e18 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -165,10 +165,10 @@ SELECT pg_stat_force_next_flush();
(1 row)
-SELECT slot_name, spill_txns, spill_count, mem_exceeded_count FROM
pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
+SELECT slot_name, spill_txns, spill_count, mem_exceeded_count > 0 as
mem_exceeded_count FROM pg_stat_replication_slots WHERE slot_name =
'regression_slot_stats4_twophase';
slot_name | spill_txns | spill_count |
mem_exceeded_count
---------------------------------+------------+-------------+--------------------
- regression_slot_stats4_twophase | 0 | 0 |
1
+ regression_slot_stats4_twophase | 0 | 0 | t
(1 row)
DROP TABLE stats_test;
diff --git a/contrib/test_decoding/sql/stats.sql
b/contrib/test_decoding/sql/stats.sql
index 9964a8efb87..b9aae0c7b63 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -65,7 +65,7 @@ SELECT count(*) FROM
pg_logical_slot_get_changes('regression_slot_stats4_twophas
-- Verify that the decoding doesn't spill already-aborted transaction's
changes.
SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns, spill_count, mem_exceeded_count FROM
pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
+SELECT slot_name, spill_txns, spill_count, mem_exceeded_count > 0 as
mem_exceeded_count FROM pg_stat_replication_slots WHERE slot_name =
'regression_slot_stats4_twophase';
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot_stats1'),
diff --git a/src/backend/replication/logical/reorderbuffer.c
b/src/backend/replication/logical/reorderbuffer.c
index 6e72864804e..094b928cf35 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -3899,18 +3899,26 @@ static void
ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
- bool memory_limit_reached = (rb->size >=
logical_decoding_work_mem * (Size) 1024);
+ bool update_stats = true;
- if (memory_limit_reached)
+ if (rb->size >= logical_decoding_work_mem * (Size) 1024)
+ {
+ /*
+ * Update the statistics as the memory usage has reached the
limit. We
+ * report the statistics update later in this function since we
can
+ * update the slot statistics altogether while streaming or
+ * serializing transactions in most cases.
+ */
rb->memExceededCount += 1;
-
- /*
- * Bail out if debug_logical_replication_streaming is buffered and we
- * haven't exceeded the memory limit.
- */
- if (debug_logical_replication_streaming ==
DEBUG_LOGICAL_REP_STREAMING_BUFFERED &&
- !memory_limit_reached)
+ }
+ else if (debug_logical_replication_streaming ==
DEBUG_LOGICAL_REP_STREAMING_BUFFERED)
+ {
+ /*
+ * Bail out if debug_logical_replication_streaming is buffered
and we
+ * haven't exceeded the memory limit.
+ */
return;
+ }
/*
* If debug_logical_replication_streaming is immediate, loop until
there's
@@ -3970,8 +3978,14 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
*/
Assert(txn->size == 0);
Assert(txn->nentries_mem == 0);
+
+ /* We've reported memExceededCount update */
+ update_stats = false;
}
+ if (update_stats)
+ UpdateDecodingStats((LogicalDecodingContext *)
rb->private_data);
+
/* We must be under the memory limit now. */
Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
}