On Sat, Mar 20, 2021 at 9:26 AM Amit Kapila <[email protected]> wrote:
>
> On Sat, Mar 20, 2021 at 12:22 AM Andres Freund <[email protected]> wrote:
> >
> > And then more generally about the feature:
> > - If a slot was used to stream out a large amount of changes (say an
> > initial data load), but then replication is interrupted before the
> > transaction is committed/aborted, stream_bytes will not reflect the
> > many gigabytes of data we may have sent.
> >
>
> We can probably update the stats each time we spilled or streamed the
> transaction data but it was not clear at that stage whether or how
> much it will be useful.
>
I felt we can update the replication slot statistics data each time we
spill/stream the transaction data instead of accumulating the
statistics and updating at the end. I have tried this in the attached
patch and the statistics data were getting updated.
Thoughts?
Regards,
Vignesh
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 99f2afc73c..9905e2b8ad 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -748,7 +748,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* not clear that sending more or less frequently than this would be
* better.
*/
- UpdateDecodingStats(ctx);
+ UpdateDecodingStats(ctx->reorder);
}
/*
@@ -830,7 +830,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* not clear that sending more or less frequently than this would be
* better.
*/
- UpdateDecodingStats(ctx);
+ UpdateDecodingStats(ctx->reorder);
}
@@ -887,7 +887,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
/* update the decoding stats */
- UpdateDecodingStats(ctx);
+ UpdateDecodingStats(ctx->reorder);
}
/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 954dbb5554..00c481e1d2 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -219,6 +219,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ namestrcpy(&ctx->reorder->slotname, NameStr(ctx->slot->data.name));
/*
* To support streaming, we require start/stop/abort/commit/change
@@ -1791,15 +1792,14 @@ ResetLogicalStreamingState(void)
* Report stats for a slot.
*/
void
-UpdateDecodingStats(LogicalDecodingContext *ctx)
+UpdateDecodingStats(ReorderBuffer *rb)
{
- ReorderBuffer *rb = ctx->reorder;
-
/*
* Nothing to do if we haven't spilled or streamed anything since the last
* time the stats has been sent.
*/
- if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+ if (rb->spillBytes <= 0 && rb->spillCount <=0 &&
+ rb->streamBytes <= 0 && rb->streamCount <=0)
return;
elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
@@ -1811,7 +1811,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
(long long) rb->streamCount,
(long long) rb->streamBytes);
- pgstat_report_replslot(NameStr(ctx->slot->data.name),
+ pgstat_report_replslot(NameStr(rb->slotname),
rb->spillTxns, rb->spillCount, rb->spillBytes,
rb->streamTxns, rb->streamCount, rb->streamBytes);
rb->spillTxns = 0;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 92c7fa7993..b60a864004 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -3528,6 +3528,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* don't consider already serialized transactions */
rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+ UpdateDecodingStats(rb);
}
Assert(spilled == txn->nentries_mem);
@@ -3897,6 +3898,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Don't consider already streamed transaction. */
rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+ UpdateDecodingStats(rb);
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 72f049b347..71d93058d2 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -139,6 +139,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
TransactionId xid, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
-extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void UpdateDecodingStats(ReorderBuffer *rb);
#endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 6c9f2c6c77..bcc4788480 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -618,6 +618,8 @@ struct ReorderBuffer
int64 streamTxns; /* number of transactions streamed */
int64 streamCount; /* streaming invocation counter */
int64 streamBytes; /* amount of data streamed */
+
+ NameData slotname; /* Slot name for updating statistics */
};