If I can follow you, I have to make the following changes:

1. In walsender.c:

static void
WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn,
TransactionId xid)
{
static TimestampTz sendTime = 0;
TimestampTz now = GetCurrentTimestamp();

/* Keep the worker process alive */
WalSndKeepalive(true);
/*
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
* avoid flooding the lag tracker when we commit frequently.
*/
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
if (!TimestampDifferenceExceeds(sendTime, now,
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
return;

LagTrackerWrite(lsn, now);
sendTime = now;
}

I put *requestReply *parameter to true, is that correct?

2. In pgoutput.c

/*
 * Sends the decoded DML over wire.
 *
 * This is called both in streaming and non-streaming modes.
 */
static void
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
Relation ancestor = NULL;

WalSndUpdateProgress(ctx, txn->origin_lsn,  change->txn->xid);

if (!is_publishable_relation(relation))
return;
...

Make a call to *WalSndUpdateProgress* in function *pgoutput_change.*

For info: the information in the log after reproducing the problem.

2022-01-13 11:19:46.340 CET [82233] LOCATION:  WalSndKeepaliveIfNecessary,
walsender.c:3389
2022-01-13 11:19:46.340 CET [82233] STATEMENT:  START_REPLICATION SLOT
"sub008_s012a00" LOGICAL 17/27240748 (proto_version '1', publication_names
'"pub008_s012a00"')
2022-01-13 11:19:46.340 CET [82233] LOG:  00000: attempt to send keep alive
message
2022-01-13 11:19:46.340 CET [82233] LOCATION:  WalSndKeepaliveIfNecessary,
walsender.c:3389
2022-01-13 11:19:46.340 CET [82233] STATEMENT:  START_REPLICATION SLOT
"sub008_s012a00" LOGICAL 17/27240748 (proto_version '1', publication_names
'"pub008_s012a00"')
2022-01-13 11:19:46.340 CET [82233] LOG:  00000: attempt to send keep alive
message
2022-01-13 11:19:46.340 CET [82233] LOCATION:  WalSndKeepaliveIfNecessary,
walsender.c:3389
2022-01-13 11:19:46.340 CET [82233] STATEMENT:  START_REPLICATION SLOT
"sub008_s012a00" LOGICAL 17/27240748 (proto_version '1', publication_names
'"pub008_s012a00"')
2022-01-13 11:20:46.418 CET [82232] ERROR:  XX000: terminating logical
replication worker due to timeout
2022-01-13 11:20:46.418 CET [82232] LOCATION:  LogicalRepApplyLoop,
worker.c:1267
2022-01-13 11:20:46.421 CET [82224] LOG:  00000: worker process: logical
replication worker for subscription 26994 (PID 82232) exited with exit code
1
2022-01-13 11:20:46.421 CET [82224] LOCATION:  LogChildExit,
postmaster.c:3625

Thanks a lot for your help.

Fabrice

On Thu, Jan 13, 2022 at 2:59 PM Amit Kapila <amit.kapil...@gmail.com> wrote:

> On Thu, Jan 13, 2022 at 3:43 PM Fabrice Chapuis <fabrice636...@gmail.com>
> wrote:
> >
> > first phase: postgres read WAL files and generate 1420 snap files.
> > second phase: I guess, but on this point maybe you can clarify, postgres
> has to decode the snap files and remove them if no statement must be
> applied on a replicated table.
> > It is from this point that the worker process exit after 1 minute
> timeout.
> >
>
> Okay, I think the problem could be that because we are skipping all
> the changes of transaction there is no communication sent to the
> subscriber and it eventually timed out. Actually, we try to send
> keep-alive at transaction boundaries like when we call
> pgoutput_commit_txn. The pgoutput_commit_txn will call
> OutputPluginWrite->WalSndWriteData. I think to tackle the problem we
> need to try to send such keepalives via WalSndUpdateProgress and
> invoke that in pgoutput_change when we skip sending the change.
>
> --
> With Regards,
> Amit Kapila.
>

Reply via email to