On Tue, Oct 10, 2023 at 6:17 PM Amit Kapila <[email protected]> wrote:
>
> DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
> Oid txn_dbid, RepOriginId origin_id)
> {
> - return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
> - (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
> - ctx->fast_forward || FilterByOrigin(ctx, origin_id));
> + bool need_skip;
> +
> + need_skip = (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
> + (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
> + ctx->decoding_mode != DECODING_MODE_NORMAL ||
> + FilterByOrigin(ctx, origin_id));
> +
> + /* Set a flag if we are in the slient mode */
> + if (ctx->decoding_mode == DECODING_MODE_SILENT)
> + ctx->output_skipped = true;
> +
> + return need_skip;
>
> I think you need to set the new flag only when we are not skipping the
> transaction or in other words when we decide to process the
> transaction. Otherwise, how will you distinguish the case where the
> xact is already decoded and sent to client?
>
In the attached patch atop your v47*, I have changed it to show you
what I have in mind.
A few more comments:
=================
1.
+
+ /*
+ * Did the logical decoding context skip outputting any changes?
+ *
+ * This flag is used only when the context is in the silent mode.
+ */
+ bool output_skipped;
} LogicalDecodingContext;
This doesn't seem to convey the meaning to the caller. How about
processing_required? BTW, I have made this change as well in the
patch.
2.
@@ -295,7 +295,7 @@ xact_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
*/
if (TransactionIdIsValid(xid))
{
- if (!ctx->fast_forward)
+ if (ctx->decoding_mode != DECODING_MODE_FAST_FORWARD)
ReorderBufferAddInvalidations(reorder, xid,
buf->origptr,
invals->nmsgs,
@@ -303,7 +303,7 @@ xact_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
buf->origptr);
}
- else if ((!ctx->fast_forward))
+ else if (ctx->decoding_mode != DECODING_MODE_FAST_FORWARD)
ReorderBufferImmediateInvalidation(ctx->reorder,
invals->nmsgs,
invals->msgs);
We don't to execute the invalidations even in silent mode. Looking at
this and other changes in the patch related to silent mode, I wonder
whether we really need to introduce 'silent_mode'. Can't we simply set
processing_required when 'fast_forward' mode is true and then let the
caller decide whether it needs to further process the WAL?
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/decode.c
b/src/backend/replication/logical/decode.c
index 6de54153f7..f3c561d8ed 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -631,7 +631,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
if (ctx->decoding_mode == DECODING_MODE_SILENT &&
!message->transactional)
{
- ctx->output_skipped = true;
+ ctx->processing_required = true;
return;
}
@@ -1294,8 +1294,6 @@ DecodeXLogTuple(char *data, Size len,
ReorderBufferTupleBuf *tuple)
* 2) The transaction happened in another database.
* 3) The output plugin is not interested in the origin.
* 4) We are not in the normal decoding mode.
- *
- * Also, set output_skipped flag if we are in the slient mode.
*/
static bool
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
@@ -1308,9 +1306,15 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf,
ctx->decoding_mode != DECODING_MODE_NORMAL ||
FilterByOrigin(ctx, origin_id));
- /* Set a flag if we are in the slient mode */
+ if (need_skip)
+ return true;
+
+ /*
+ * We don't need to process the transaction in silent mode. Indicate the
+ * same via LogicalDecodingContext, so that the caller can skip
processing.
+ */
if (ctx->decoding_mode == DECODING_MODE_SILENT)
- ctx->output_skipped = true;
+ ctx->processing_required = true;
- return need_skip;
+ return true;
}
diff --git a/src/backend/replication/logical/logical.c
b/src/backend/replication/logical/logical.c
index 0f4b1c6323..e47f2ebd7c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -2030,7 +2030,7 @@ DecodingContextHasdecodedItems(LogicalDecodingContext
*ctx,
InvalidateSystemCaches();
/* Loop until the end of WAL or some changes are processed */
- while (!ctx->output_skipped && ctx->reader->EndRecPtr < end_of_wal)
+ while (!ctx->processing_required && ctx->reader->EndRecPtr < end_of_wal)
{
XLogRecord *record;
char *errm = NULL;
@@ -2046,5 +2046,5 @@ DecodingContextHasdecodedItems(LogicalDecodingContext
*ctx,
CHECK_FOR_INTERRUPTS();
}
- return ctx->output_skipped;
+ return ctx->processing_required;
}
diff --git a/src/include/replication/logical.h
b/src/include/replication/logical.h
index d0f9dda6c5..94cc631a5b 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -128,12 +128,8 @@ typedef struct LogicalDecodingContext
/* Are we processing the end LSN of a transaction? */
bool end_xact;
- /*
- * Did the logical decoding context skip outputting any changes?
- *
- * This flag is used only when the context is in the silent mode.
- */
- bool output_skipped;
+ /* Do we need to process any change in silent decoding mode? */
+ bool processing_required;
} LogicalDecodingContext;