On Thu, Jun 18, 2026 at 9:33 AM shveta malik <[email protected]> wrote:
>
> On Tue, Jun 16, 2026 at 6:54 PM Dilip Kumar <[email protected]> wrote:
> >
> > IMHO we should just log WARNING and continue the apply worker on
> > conflict insertion failure, lets see what other thinks on this.
> >
>
> I have the same opinion. Allowing CLT to block the apply worker would
> be undesirable; CLT is a history/logs collection feature and should
> not interrupt core logical replication work.
>

I think the insert can fail in rare cases like disk getting full while
writing WAL or some internal memory ERROR and the ERROR could be
persistent which means the LOG will be filled with the same WARNING if
there are many conflicts. Also, users may not like missing out on
conflict information. So, we can ERROR out and let users fix the
situation. Additionally, the nested try-catch to downgrade ERROR to
WARNING also looks ugly and a source of future bugs and maintenance
burden. The attached patch tries to fix this by ERRORing out on
insertion failure and attaching the required conflict info as a
context of ERROR. The patch also improved the ReportApplyConflict()
non-ERROR paths by displaying the conflict information in server LOGs
before inserting the same into CLT so that if insertion fails, the
complete conflict info can be present in server LOGs. See
v52-1-amit.Improve-error-handling-for-conflict-log-table-ins.

Additionally, there is another problem with 0003 where when a parallel
apply worker hits an ERROR-level conflict it logs the conflict to the
conflict log table in a new transaction in its error path, after
aborting the failed apply transaction.  But the leader detects worker
failure in pa_wait_for_xact_finish() by waiting on the worker's
transaction lock, and AbortOutOfAnyTransaction() releases that lock:
the leader unblocks, sees a non-finished state, raises "lost
connection to the logical replication parallel apply worker", and
tears the worker down -- which can SIGTERM it mid-insert and lose the
conflict log row, besides being a misleading message. The attached
top-up patch v52-2-amit.fix_parallel_apply_logging fixes that by
introducing PARALLEL_TRANS_ERROR state. I think if you are okay with
this patch, you can merge it into your 0003 but
v52-1-amit.Improve-error-handling-for-conflict-log-table-ins can be
reviewed before being merged into 0003.

I have taken help from AI to work on these top-up (atop v52 0003)
bug-fix patches and done self initial review and test of these but
more review and testing is required for this work.

-- 
With Regards,
Amit Kapila.
From a148d389d3ec3b87a45de4267aeea6e559a0c556 Mon Sep 17 00:00:00 2001
From: Amit Kapila <[email protected]>
Date: Fri, 19 Jun 2026 11:09:08 +0530
Subject: [PATCH v3] Improve error handling for conflict log table insertions.

When a subscription logs conflicts to its conflict log table (CLT), the
apply worker prepares the conflict tuple and inserts it either inline
(for sub-ERROR conflicts, where apply continues) or, for ERROR-level
conflicts, defers the insertion to a fresh transaction after the apply
transaction aborts (ProcessPendingConflictLogTuple), so the log row is
not rolled back with the failed change.

The previous error handling around that insertion had a few problems:

* ProcessPendingConflictLogTuple() wrapped the insert in its own
PG_TRY/PG_CATCH that, on failure, called FlushErrorState() and
downgraded the failure to a WARNING.  When called from start_apply()'s
PG_CATCH (which then does PG_RE_THROW()), that FlushErrorState() reset
the error stack and discarded the very error being re-thrown, so a
failure of the deferred insert could corrupt error reporting.  It also
silently swallowed genuine insertion failures.

* If the inline insert (sub-ERROR path) failed, the conflict was not
recorded anywhere: the apply transaction aborted before the conflict
was written to the server log.

* In the parallel apply worker, a failed deferred insert likewise lost
the original error.

Rework this so that a failure to write the CLT is treated as a normal
apply error, and so that the conflict is always identifiable:

* Drop the internal PG_TRY/PG_CATCH from ProcessPendingConflictLogTuple().
A failed insert now raises an ERROR like any other apply failure; such
failures (e.g. the CLT was dropped, or out-of-space) are expected to be
rare and persistent.

* Annotate insertion failures with the conflict being logged.
prepare_conflict_log_tuple() stashes a short description ("while logging
conflict <type> detected on relation <rel>") in a new
LogicalRepWorker.conflict_log_errcontext field (allocated, like
conflict_log_tuple, in ApplyContext), and InsertConflictLogTuple()
installs an error context callback around the heap_insert().  Any error
raised during the insert therefore carries enough context to identify
the conflict, even on the deferred path where the original conflict
error is not separately reported.

* Report the conflict to the server log before inserting it into the
table.  ReportApplyConflict() now prepares the tuple, emits the
server-log message (full details when the destination includes the log,
otherwise a short message pointing at the table), and only then inserts.
This guarantees the conflict is recorded even if the table insert fails.
The inline (sub-ERROR) insert is wrapped so that, on failure, the
prepared tuple is discarded before re-throwing, ensuring the deferred
path does not retry the same failing insert.

* Rework the worker error handlers to do the deferred insertion cleanly.
start_apply() and ParallelApplyWorkerMain() now copy the error into a
long-lived context, FlushErrorState(), reset error_context_stack (the
callbacks active at throw time belong to unwound frames, and the insert
installs its own), abort, run ProcessPendingConflictLogTuple(), and
finally ReThrowError() the original error.  On success the original
error is re-thrown; on a failed insert the insert error (carrying the
conflict context) propagates instead.  For the parallel apply worker
this means the leader always receives a real ErrorResponse rather than
a "lost connection to the parallel apply worker" message.

* In DisableSubscriptionAndExit(), perform the deferred insertion after
the subscription has been disabled and committed (and before the
dead-tuple retention check), so a failure to log the conflict cannot
prevent the subscription from being disabled or leave the worker
restarting and failing forever.
---
 .../replication/logical/applyparallelworker.c |  38 ++--
 src/backend/replication/logical/conflict.c    | 214 +++++++++++-------
 src/backend/replication/logical/worker.c      |  51 ++++-
 src/include/replication/worker_internal.h     |   7 +
 4 files changed, 211 insertions(+), 99 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c 
b/src/backend/replication/logical/applyparallelworker.c
index d24c52d43e6..48cb5558367 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -993,32 +993,44 @@ ParallelApplyWorkerMain(Datum main_arg)
        PG_CATCH();
        {
                MemoryContext oldcontext;
-               ErrorData *edata;
+               ErrorData  *edata;
 
                /*
-                * Copy the ErrorData before doing any further work. The error 
may
-                * have been raised while running under ErrorContext, so switch 
to
-                * a safe context (TopMemoryContext) to avoid assertions and 
ensure
-                * the error data survives subsequent cleanup.
+                * Reset the origin state to prevent the advancement of origin
+                * progress if we fail to apply. Otherwise, this will result in
+                * transaction loss as that transaction won't be sent again by 
the
+                * server.
+                */
+               replorigin_xact_clear(true);
+
+               /*
+                * Copy the error and recover to an idle state so we can insert 
the
+                * deferred conflict log tuple (if any) before re-throwing.  
Copy the
+                * error into a longer-lived context first, as it may have been 
raised
+                * under ErrorContext.  Also reset the error context stack: the
+                * callbacks in effect when the error was thrown belong to 
unwound stack
+                * frames, and the deferred insert installs its own.
                 */
                oldcontext = MemoryContextSwitchTo(TopMemoryContext);
                edata = CopyErrorData();
                MemoryContextSwitchTo(oldcontext);
 
                FlushErrorState();
+               error_context_stack = NULL;
+               AbortOutOfAnyTransaction();
 
                /*
-                * Reset the origin state to prevent the advancement of origin
-                * progress if we fail to apply. Otherwise, this will result in
-                * transaction loss as that transaction won't be sent again by 
the
-                * server.
+                * Insert the deferred conflict log tuple before re-throwing.
+                * Re-throwing is what reports the error to the leader (via the 
error
+                * queue set up above), so the insertion must happen first: 
otherwise
+                * the leader could start tearing down this worker while it is 
still
+                * writing the conflict log tuple.  If the insertion itself 
fails, that
+                * error (annotated with the conflict context, see 
InsertConflictLogTuple)
+                * propagates to the leader instead of the original.
                 */
-               replorigin_xact_clear(true);
-
-               AbortOutOfAnyTransaction();
                ProcessPendingConflictLogTuple();
 
-               /* Re-throw the original error. */
+               /* Re-throw the original error, which reports it to the leader. 
*/
                ReThrowError(edata);
        }
        PG_END_TRY();
diff --git a/src/backend/replication/logical/conflict.c 
b/src/backend/replication/logical/conflict.c
index 6bf3d6d5a44..c2c15f055e6 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -335,17 +335,16 @@ ReportApplyConflict(EState *estate, ResultRelInfo 
*relinfo, int elevel,
        log_dest_table = CONFLICTS_LOGGED_TO_TABLE(dest);
        log_dest_logfile = CONFLICTS_LOGGED_TO_LOG(dest);
 
-       /* Insert to table if requested. */
+       /*
+        * Prepare the conflict log tuple first when the destination includes 
the
+        * table.  This must happen before the ereport() below, because for an
+        * ERROR-level conflict that ereport() raises the error and defers the
+        * actual insertion to ProcessPendingConflictLogTuple(), which relies 
on the
+        * tuple having been prepared.
+        */
        if (log_dest_table)
        {
                Assert(conflictlogrel != NULL);
-
-               /*
-                * Prepare the conflict log tuple. If the error level is below 
ERROR,
-                * insert it immediately. Otherwise, defer the insertion to a 
new
-                * transaction after the current one aborts, ensuring the 
insertion of
-                * the log tuple is not rolled back.
-                */
                prepare_conflict_log_tuple(estate,
                                                                   
relinfo->ri_RelationDesc,
                                                                   
conflictlogrel,
@@ -353,29 +352,16 @@ ReportApplyConflict(EState *estate, ResultRelInfo 
*relinfo, int elevel,
                                                                   searchslot,
                                                                   
conflicttuples,
                                                                   remoteslot);
-               if (elevel < ERROR)
-                       InsertConflictLogTuple(conflictlogrel);
-
-               if (!log_dest_logfile)
-               {
-                       /*
-                        * Not logging conflict details to the server log; 
Report the error
-                        * msg but omit raw tuple data from server logs since 
it's already
-                        * captured in the conflict log table.
-                        */
-                       ereport(elevel,
-                                       errcode_apply_conflict(type),
-                                       errmsg("conflict detected on relation 
\"%s\": conflict=%s",
-                                               
RelationGetQualifiedRelationName(localrel),
-                                               ConflictTypeNames[type]),
-                                       errdetail("Conflict details are logged 
to the conflict log table: %s",
-                                                         
RelationGetRelationName(conflictlogrel)));
-               }
-
-               table_close(conflictlogrel, RowExclusiveLock);
        }
 
-       /* Log into the server log if requested. */
+       /*
+        * Report the conflict to the server log before inserting it into the
+        * conflict log table.  Emitting it first guarantees the conflict is
+        * recorded even if the table insert below fails; it is also what 
raises the
+        * error for ERROR-level conflicts.  When the server log is one of the
+        * destinations we emit the full details, otherwise (table-only) we 
emit a
+        * shorter message since the details are captured in the table.
+        */
        if (log_dest_logfile)
        {
                StringInfoData  err_detail;
@@ -400,6 +386,64 @@ ReportApplyConflict(EState *estate, ResultRelInfo 
*relinfo, int elevel,
                                           ConflictTypeNames[type]),
                                errdetail_internal("%s", err_detail.data));
        }
+       else if (log_dest_table)
+       {
+               /*
+                * Not logging conflict details to the server log; report the 
conflict
+                * but omit raw tuple data since it is captured in the conflict 
log
+                * table.
+                */
+               ereport(elevel,
+                               errcode_apply_conflict(type),
+                               errmsg("conflict detected on relation \"%s\": 
conflict=%s",
+                                       
RelationGetQualifiedRelationName(localrel),
+                                       ConflictTypeNames[type]),
+                               errdetail("Conflict details are logged to the 
conflict log table: %s",
+                                                 
RelationGetRelationName(conflictlogrel)));
+       }
+
+       /*
+        * Insert into the conflict log table if requested.  For conflicts below
+        * ERROR the apply transaction continues, so insert immediately; for
+        * ERROR-level conflicts the ereport() above already raised the error 
and
+        * the insertion is deferred to a new transaction
+        * (ProcessPendingConflictLogTuple) so that it is not rolled back.
+        */
+       if (log_dest_table)
+       {
+               if (elevel < ERROR)
+               {
+                       PG_TRY();
+                       {
+                               InsertConflictLogTuple(conflictlogrel);
+                       }
+                       PG_CATCH();
+                       {
+                               /*
+                                * The insert failed, so the apply transaction 
will abort and
+                                * the error will propagate to the worker's 
error handler.  The
+                                * conflict was already reported to the server 
log above, so it
+                                * is not lost.  Discard the prepared tuple so 
that the deferred
+                                * insertion path 
(ProcessPendingConflictLogTuple) does not retry
+                                * this same failing insert.
+                                */
+                               if (MyLogicalRepWorker->conflict_log_tuple != 
NULL)
+                               {
+                                       
heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+                                       MyLogicalRepWorker->conflict_log_tuple 
= NULL;
+                               }
+                               if (MyLogicalRepWorker->conflict_log_errcontext 
!= NULL)
+                               {
+                                       
pfree(MyLogicalRepWorker->conflict_log_errcontext);
+                                       
MyLogicalRepWorker->conflict_log_errcontext = NULL;
+                               }
+                               PG_RE_THROW();
+                       }
+                       PG_END_TRY();
+               }
+
+               table_close(conflictlogrel, RowExclusiveLock);
+       }
 }
 
 /*
@@ -428,60 +472,28 @@ ProcessPendingConflictLogTuple(void)
        if (MyLogicalRepWorker->conflict_log_tuple == NULL)
                return;
 
-       PG_TRY();
-       {
-               StartTransactionCommand();
-               PushActiveSnapshot(GetTransactionSnapshot());
-
-               /* Open conflict log table and insert the tuple */
-               conflictlogrel = GetConflictLogDestAndTable(&dest);
-               Assert(conflictlogrel);
-
-               InsertConflictLogTuple(conflictlogrel);
-
-               table_close(conflictlogrel, RowExclusiveLock);
-
-               PopActiveSnapshot();
-               CommitTransactionCommand();
-       }
-       PG_CATCH();
-       {
-               ErrorData  *edata;
-               MemoryContext oldctx;
-
-               /* Save error info in our memory context */
-               oldctx = MemoryContextSwitchTo(TopMemoryContext);
-               edata = CopyErrorData();
-               MemoryContextSwitchTo(oldctx);
-
-               /* Clear the error state so we can continue */
-               FlushErrorState();
+       /*
+        * Insert the deferred conflict log tuple in its own transaction.  A
+        * failure here (e.g. the conflict log table was dropped, or an
+        * out-of-disk-space error) is treated like any other apply error and
+        * raises an ERROR; such failures are expected to be rare and 
persistent.
+        * Callers must therefore have already reported (and cleared) any
+        * in-progress apply error before calling this, so that this error does 
not
+        * mask the original one.
+        */
+       StartTransactionCommand();
+       PushActiveSnapshot(GetTransactionSnapshot());
 
-               /* Abort the transaction we started above */
-               AbortOutOfAnyTransaction();
+       /* Open conflict log table and insert the tuple */
+       conflictlogrel = GetConflictLogDestAndTable(&dest);
+       Assert(conflictlogrel);
 
-               /*
-                * Report the error as a warning. We use WARNING because we 
don't want
-                * this to be a fatal error for the worker, and we want to 
allow the
-                * caller's original error to remain primary.
-                */
-               ereport(WARNING,
-                               (errmsg("could not log conflict to table for 
subscription \"%s\": %s",
-                                               MySubscription->name, 
edata->message)));
+       InsertConflictLogTuple(conflictlogrel);
 
-               FreeErrorData(edata);
+       table_close(conflictlogrel, RowExclusiveLock);
 
-               /*
-                * Free the conflict log tuple and set it to NULL. This ensures 
we
-                * don't try to insert the same problematic tuple again.
-                */
-               if (MyLogicalRepWorker->conflict_log_tuple != NULL)
-               {
-                       heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
-                       MyLogicalRepWorker->conflict_log_tuple = NULL;
-               }
-       }
-       PG_END_TRY();
+       PopActiveSnapshot();
+       CommitTransactionCommand();
 }
 
 /*
@@ -544,6 +556,19 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
        return table_open(conflictlogrelid, RowExclusiveLock);
 }
 
+/*
+ * Error context callback for failures while inserting into the conflict log
+ * table.  Adds a line identifying the conflict that was being logged.
+ */
+static void
+conflict_log_insert_errcontext(void *arg)
+{
+       char       *ctx = (char *) arg;
+
+       if (ctx)
+               errcontext("%s", ctx);
+}
+
 /*
  * InsertConflictLogTuple
  *
@@ -554,15 +579,34 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
 void
 InsertConflictLogTuple(Relation conflictlogrel)
 {
+       ErrorContextCallback errcallback;
+
        /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
        Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
 
+       /*
+        * Set up an error context so that a failure to insert (e.g. the 
conflict
+        * log table was dropped, or an out-of-space error) carries information
+        * identifying the conflict we were trying to log.
+        */
+       errcallback.callback = conflict_log_insert_errcontext;
+       errcallback.arg = MyLogicalRepWorker->conflict_log_errcontext;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
        heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
                                GetCurrentCommandId(true), 
HEAP_INSERT_NO_LOGICAL, NULL);
 
-       /* Free conflict log tuple. */
+       error_context_stack = errcallback.previous;
+
+       /* Free the conflict log tuple and its context string. */
        heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
        MyLogicalRepWorker->conflict_log_tuple = NULL;
+       if (MyLogicalRepWorker->conflict_log_errcontext)
+       {
+               pfree(MyLogicalRepWorker->conflict_log_errcontext);
+               MyLogicalRepWorker->conflict_log_errcontext = NULL;
+       }
 }
 
 /*
@@ -1381,5 +1425,15 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
        oldctx = MemoryContextSwitchTo(ApplyContext);
        MyLogicalRepWorker->conflict_log_tuple =
                heap_form_tuple(RelationGetDescr(conflictlogrel), values, 
nulls);
+
+       /*
+        * Stash a context string describing this conflict, so that if inserting
+        * the tuple into the conflict log table fails, the resulting error 
carries
+        * enough context to identify the conflict (see InsertConflictLogTuple).
+        */
+       MyLogicalRepWorker->conflict_log_errcontext =
+               psprintf("while logging conflict \"%s\" detected on relation 
\"%s\"",
+                                ConflictTypeNames[conflict_type],
+                                RelationGetRelationName(rel));
        MemoryContextSwitchTo(oldctx);
 }
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 05ba6d8b1ad..69480cbc886 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -5658,6 +5658,9 @@ start_apply(XLogRecPtr origin_startpos)
        }
        PG_CATCH();
        {
+               MemoryContext oldcontext;
+               ErrorData  *edata;
+
                /*
                 * Reset the origin state to prevent the advancement of origin
                 * progress if we fail to apply. Otherwise, this will result in
@@ -5671,15 +5674,33 @@ start_apply(XLogRecPtr origin_startpos)
                else
                {
                        /*
-                        * Report the worker failed while applying changes. 
Abort the
-                        * current transaction so that the stats message is 
sent in an
-                        * idle state.
+                        * Save the error and recover to an idle state so we 
can insert the
+                        * deferred conflict log tuple (if any) before 
re-throwing.  Copy
+                        * the error into a long-lived context first, as it may 
have been
+                        * raised under ErrorContext.  Also reset the error 
context stack:
+                        * the callbacks in effect when the error was thrown 
belong to
+                        * unwound stack frames, and the deferred insert 
installs its own.
                         */
+                       oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+                       edata = CopyErrorData();
+                       MemoryContextSwitchTo(oldcontext);
+
+                       FlushErrorState();
+                       error_context_stack = NULL;
                        AbortOutOfAnyTransaction();
                        pgstat_report_subscription_error(MySubscription->oid);
+
+                       /*
+                        * Insert the deferred conflict log tuple in its own 
transaction.
+                        * If this fails, that error (annotated with the 
conflict context,
+                        * see InsertConflictLogTuple) propagates instead of 
the original;
+                        * such failures are expected to be rare and persistent 
(e.g. out of
+                        * disk space).
+                        */
                        ProcessPendingConflictLogTuple();
 
-                       PG_RE_THROW();
+                       /* Re-throw the original error. */
+                       ReThrowError(edata);
                }
        }
        PG_END_TRY();
@@ -6046,14 +6067,19 @@ DisableSubscriptionAndExit(void)
 
        RESUME_INTERRUPTS();
 
+       /*
+        * The error context callbacks in effect when the error was thrown 
belong
+        * to now-unwound stack frames; reset the stack before running further 
code
+        * (including the deferred conflict log insertion, which installs its 
own).
+        */
+       error_context_stack = NULL;
+
        /*
         * Report the worker failed during sequence synchronization, table
         * synchronization, or apply.
         */
        pgstat_report_subscription_error(MyLogicalRepWorker->subid);
 
-       ProcessPendingConflictLogTuple();
-
        /* Disable the subscription */
        StartTransactionCommand();
 
@@ -6076,6 +6102,19 @@ DisableSubscriptionAndExit(void)
                        errmsg("subscription \"%s\" has been disabled because 
of an error",
                                   MySubscription->name));
 
+       /*
+        * Insert the deferred conflict log tuple (if any) now that the
+        * subscription has been disabled and committed.  Doing it after the
+        * disable means a failure to log the conflict (treated as a hard error,
+        * see ProcessPendingConflictLogTuple) cannot prevent the subscription 
from
+        * being disabled and so cannot leave the worker restarting and failing
+        * forever.  Do it before the dead-tuple retention check below: that 
check
+        * only warns today, but it takes an elevel and could raise an error, 
which
+        * must not prevent the conflict from being recorded.  The original 
error
+        * was already reported above.
+        */
+       ProcessPendingConflictLogTuple();
+
        /*
         * Skip the track_commit_timestamp check when disabling the worker due 
to
         * an error, as verifying commit timestamps is unnecessary in this
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 6b6525dc2e2..c0059b1b810 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -103,6 +103,13 @@ typedef struct LogicalRepWorker
        /* A conflict log tuple that is prepared but not yet inserted. */
        HeapTuple       conflict_log_tuple;
 
+       /*
+        * Error-context string describing the conflict above, used to annotate 
any
+        * error raised while inserting conflict_log_tuple into the conflict log
+        * table.  Allocated, like conflict_log_tuple, in ApplyContext.
+        */
+       char       *conflict_log_errcontext;
+
        /* Stats. */
        XLogRecPtr      last_lsn;
        TimestampTz last_send_time;
-- 
2.54.0

From 8806a07656c7a90668132dc17f051174f2a62ae6 Mon Sep 17 00:00:00 2001
From: Amit Kapila <[email protected]>
Date: Fri, 19 Jun 2026 15:40:07 +0530
Subject: [PATCH v4] Don't tear down a parallel apply worker before it logs its
 conflict

When a parallel apply worker hits an ERROR-level conflict it logs the
conflict to the conflict log table in a new transaction in its error path,
after aborting the failed apply transaction.  But the leader detects worker
failure in pa_wait_for_xact_finish() by waiting on the worker's transaction
lock, and AbortOutOfAnyTransaction() releases that lock: the leader unblocks,
sees a non-finished state, raises "lost connection to the logical replication
parallel apply worker", and tears the worker down -- which can SIGTERM it
mid-insert and lose the conflict log row, besides being a misleading message.

Fix this by adding a PARALLEL_TRANS_ERROR state that the worker sets before
aborting (while the leader is still blocked on the lock, so it is visible once
the leader unblocks).  On seeing it, the leader waits for the worker to report
its actual error via the error queue -- which keeps the worker alive long
enough to finish writing the conflict log tuple and lets the leader report the
real error instead of "lost connection".  The original message remains as the
fallback for a worker that died without signalling; as a side effect,
non-conflict worker failures now also surface the real error.
---
 .../replication/logical/applyparallelworker.c | 35 +++++++++++++++++++
 src/include/replication/worker_internal.h     |  2 ++
 2 files changed, 37 insertions(+)

diff --git a/src/backend/replication/logical/applyparallelworker.c 
b/src/backend/replication/logical/applyparallelworker.c
index 48cb5558367..a3f5b9b122d 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -1017,6 +1017,17 @@ ParallelApplyWorkerMain(Datum main_arg)
 
                FlushErrorState();
                error_context_stack = NULL;
+
+               /*
+                * Tell the leader we failed and are about to report the error 
and log
+                * the conflict.  This must be set before 
AbortOutOfAnyTransaction()
+                * below releases the transaction lock that the leader waits on 
in
+                * pa_wait_for_xact_finish(); otherwise the leader would see a
+                * non-finished state, assume the connection was lost, and tear 
this
+                * worker down while it is still writing the conflict log tuple.
+                */
+               pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_ERROR);
+
                AbortOutOfAnyTransaction();
 
                /*
@@ -1361,9 +1372,33 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
         * released.
         */
        if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
+       {
+               /*
+                * If the worker signalled that it errored 
(PARALLEL_TRANS_ERROR), it is
+                * logging the conflict and will report the actual error via 
the error
+                * queue before exiting.  Wait for that rather than reporting a 
generic
+                * lost connection: CHECK_FOR_INTERRUPTS() drives
+                * ProcessParallelApplyMessages(), which raises the real error 
on the
+                * worker's ErrorResponse (or "lost connection" if the worker 
died
+                * without reporting).  Waiting here also keeps the worker 
alive long
+                * enough to finish writing the conflict log tuple.
+                */
+               while (pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_ERROR)
+               {
+                       CHECK_FOR_INTERRUPTS();
+
+                       (void) WaitLatch(MyLatch,
+                                                        WL_LATCH_SET | 
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+                                                        10L,
+                                                        
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
+
+                       ResetLatch(MyLatch);
+               }
+
                ereport(ERROR,
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("lost connection to the logical 
replication parallel apply worker")));
+       }
 }
 
 /*
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index c0059b1b810..79c90dddd89 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -131,6 +131,8 @@ typedef enum ParallelTransState
        PARALLEL_TRANS_UNKNOWN,
        PARALLEL_TRANS_STARTED,
        PARALLEL_TRANS_FINISHED,
+       PARALLEL_TRANS_ERROR,           /* worker failed; it will report the 
error (and
+                                                                * log the 
conflict, if any) before exiting */
 } ParallelTransState;
 
 /*
-- 
2.54.0

Reply via email to