If I can follow you, I have to make the following changes: 1. In walsender.c:
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) { static TimestampTz sendTime = 0; TimestampTz now = GetCurrentTimestamp(); /* Keep the worker process alive */ WalSndKeepalive(true); /* * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to * avoid flooding the lag tracker when we commit frequently. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 if (!TimestampDifferenceExceeds(sendTime, now, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) return; LagTrackerWrite(lsn, now); sendTime = now; } I put *requestReply *parameter to true, is that correct? 2. In pgoutput.c /* * Sends the decoded DML over wire. * * This is called both in streaming and non-streaming modes. */ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; WalSndUpdateProgress(ctx, txn->origin_lsn, change->txn->xid); if (!is_publishable_relation(relation)) return; ... Make a call to *WalSndUpdateProgress* in function *pgoutput_change.* For info: the information in the log after reproducing the problem. 2022-01-13 11:19:46.340 CET [82233] LOCATION: WalSndKeepaliveIfNecessary, walsender.c:3389 2022-01-13 11:19:46.340 CET [82233] STATEMENT: START_REPLICATION SLOT "sub008_s012a00" LOGICAL 17/27240748 (proto_version '1', publication_names '"pub008_s012a00"') 2022-01-13 11:19:46.340 CET [82233] LOG: 00000: attempt to send keep alive message 2022-01-13 11:19:46.340 CET [82233] LOCATION: WalSndKeepaliveIfNecessary, walsender.c:3389 2022-01-13 11:19:46.340 CET [82233] STATEMENT: START_REPLICATION SLOT "sub008_s012a00" LOGICAL 17/27240748 (proto_version '1', publication_names '"pub008_s012a00"') 2022-01-13 11:19:46.340 CET [82233] LOG: 00000: attempt to send keep alive message 2022-01-13 11:19:46.340 CET [82233] LOCATION: WalSndKeepaliveIfNecessary, walsender.c:3389 2022-01-13 11:19:46.340 CET [82233] STATEMENT: START_REPLICATION SLOT "sub008_s012a00" LOGICAL 17/27240748 (proto_version '1', publication_names '"pub008_s012a00"') 2022-01-13 11:20:46.418 CET [82232] ERROR: XX000: terminating logical replication worker due to timeout 2022-01-13 11:20:46.418 CET [82232] LOCATION: LogicalRepApplyLoop, worker.c:1267 2022-01-13 11:20:46.421 CET [82224] LOG: 00000: worker process: logical replication worker for subscription 26994 (PID 82232) exited with exit code 1 2022-01-13 11:20:46.421 CET [82224] LOCATION: LogChildExit, postmaster.c:3625 Thanks a lot for your help. Fabrice On Thu, Jan 13, 2022 at 2:59 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > On Thu, Jan 13, 2022 at 3:43 PM Fabrice Chapuis <fabrice636...@gmail.com> > wrote: > > > > first phase: postgres read WAL files and generate 1420 snap files. > > second phase: I guess, but on this point maybe you can clarify, postgres > has to decode the snap files and remove them if no statement must be > applied on a replicated table. > > It is from this point that the worker process exit after 1 minute > timeout. > > > > Okay, I think the problem could be that because we are skipping all > the changes of transaction there is no communication sent to the > subscriber and it eventually timed out. Actually, we try to send > keep-alive at transaction boundaries like when we call > pgoutput_commit_txn. The pgoutput_commit_txn will call > OutputPluginWrite->WalSndWriteData. I think to tackle the problem we > need to try to send such keepalives via WalSndUpdateProgress and > invoke that in pgoutput_change when we skip sending the change. > > -- > With Regards, > Amit Kapila. >