From 1a2ff45c9d10320f62b6fa9d3ed43fc3c6fe8b10 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Sun, 11 Dec 2022 19:16:34 +0800
Subject: [PATCH v79 4/4] Retry to apply streaming xact only in apply worker

When the subscription parameter is set streaming=parallel, the logic tries to
apply the streaming transaction using a parallel apply worker. If this
fails the parallel worker exits with an error.

In this case, retry applying the streaming transaction using the normal
streaming=on mode. This is done to avoid getting caught in a loop of the same
retry errors.

A new flag field "subretry" has been introduced to catalog "pg_subscription".
If there are any active parallel apply workers and the subscriber exits with an
error, this flag will be set true, and whenever the transaction is applied
successfully, this flag is reset false.
Now, when deciding how to apply a streaming transaction, the logic can know if
this transaction has previously failed or not (by checking the "subretry"
field).

Note: Since we add a new field 'subretry' to catalog 'pg_subscription' has been
expanded, we need bump catalog version.
---
 doc/src/sgml/catalogs.sgml                         |  10 ++
 doc/src/sgml/logical-replication.sgml              |  11 +-
 doc/src/sgml/ref/create_subscription.sgml          |   5 +
 src/backend/catalog/pg_subscription.c              |   1 +
 src/backend/catalog/system_views.sql               |   2 +-
 src/backend/commands/subscriptioncmds.c            |   1 +
 .../replication/logical/applyparallelworker.c      |  30 ++++
 src/backend/replication/logical/worker.c           | 182 +++++++++++++++------
 src/bin/pg_dump/pg_dump.c                          |   5 +-
 src/include/catalog/pg_subscription.h              |   7 +
 src/include/replication/worker_internal.h          |   2 +
 src/test/subscription/t/015_stream.pl              |  63 +++++++
 12 files changed, 263 insertions(+), 56 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048..d918327 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7950,6 +7950,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subretry</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if previous change failed to be applied while there were any
+       active parallel apply workers, necessitating a retry.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
       </para>
       <para>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index f4b4e64..57e938f 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1504,12 +1504,11 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
 
   <para>
    When the streaming mode is <literal>parallel</literal>, the finish LSN of
-   failed transactions may not be logged. In that case, it may be necessary to
-   change the streaming mode to <literal>on</literal> or <literal>off</literal> and
-   cause the same conflicts again so the finish LSN of the failed transaction will
-   be written to the server log. For the usage of finish LSN, please refer to <link
-   linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ...
-   SKIP</command></link>.
+   failed transactions may not be logged. In that case, the failed transaction
+   will be retried in <literal>streaming = on</literal> mode. If it fails
+   again, the finish LSN of the failed transaction will be written to the
+   server log. For the usage of finish LSN, please refer to
+   <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... SKIP</command></link>.
   </para>
  </sect1>
 
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index eba72c6..decd554 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -251,6 +251,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           transaction is committed. Note that if an error happens in a
           parallel apply worker, the finish LSN of the remote transaction
           might not be reported in the server log.
+          When applying streaming transactions, if a deadlock is detected, the
+          parallel apply worker will exit with an error. The
+          <literal>parallel</literal> mode is disregarded when retrying;
+          instead the transaction will be applied using <literal>on</literal>
+          mode.
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae31..0eb5ffb 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->retry = subform->subretry;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ff63405..97b145a 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1302,7 +1302,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subretry, subslotname, subsynccommit, subpublications, suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index baff00d..b2053f7 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -636,6 +636,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+	values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 777926c..fc351d7 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -315,6 +315,17 @@ pa_can_start(void)
 	if (!AllTablesyncsReady())
 		return false;
 
+	/*
+	 * Don't use parallel apply workers for retries, because it is possible
+	 * that a deadlock was detected the last time we tried to apply a
+	 * transaction using a parallel apply worker.
+	 */
+	if (MySubscription->retry)
+	{
+		elog(DEBUG1, "parallel apply workers are not used for retries");
+		return false;
+	}
+
 	return true;
 }
 
@@ -1678,3 +1689,22 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 
 	pa_free_worker(winfo);
 }
+
+/* Check if any active parallel apply workers. */
+bool
+pa_have_active_worker(void)
+{
+	ListCell   *lc;
+
+	foreach(lc, ParallelApplyWorkerPool)
+	{
+		ParallelApplyWorkerInfo *tmp_winfo;
+
+		tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
+
+		if (tmp_winfo->in_use)
+			return true;
+	}
+
+	return false;
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5c8ce97..0c7bf67 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -394,7 +394,7 @@ static void stream_close_file(void);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
-static void DisableSubscriptionAndExit(void);
+static void DisableSubscriptionOnError(void);
 
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
@@ -431,6 +431,8 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+static void set_subscription_retry(bool retry);
+
 /*
  * Return the name of the logical replication worker.
  */
@@ -1046,6 +1048,9 @@ apply_handle_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1152,6 +1157,9 @@ apply_handle_prepare(StringInfo s)
 
 	in_remote_transaction = false;
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
@@ -1208,6 +1216,9 @@ apply_handle_commit_prepared(StringInfo s)
 	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
 	in_remote_transaction = false;
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
@@ -1269,6 +1280,9 @@ apply_handle_rollback_prepared(StringInfo s)
 	store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
 	in_remote_transaction = false;
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(rollback_data.rollback_end_lsn);
 
@@ -1394,6 +1408,9 @@ apply_handle_stream_prepare(StringInfo s)
 			break;
 	}
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	pgstat_report_stat(false);
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1970,6 +1987,10 @@ apply_handle_stream_abort(StringInfo s)
 			break;
 	}
 
+	/* Reset the retry flag. */
+	if (toplevel_xact)
+		set_subscription_retry(false);
+
 	reset_apply_error_context_info();
 }
 
@@ -2252,6 +2273,9 @@ apply_handle_stream_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	/* Reset the retry flag. */
+	set_subscription_retry(false);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -4348,20 +4372,28 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 	}
 	PG_CATCH();
 	{
+		/*
+		 * Emit the error message, and recover from the error state to an idle
+		 * state
+		 */
+		HOLD_INTERRUPTS();
+
+		EmitErrorReport();
+		AbortOutOfAnyTransaction();
+		FlushErrorState();
+
+		RESUME_INTERRUPTS();
+
+		/* Report the worker failed during table synchronization */
+		pgstat_report_subscription_error(MySubscription->oid, false);
+
 		if (MySubscription->disableonerr)
-			DisableSubscriptionAndExit();
-		else
-		{
-			/*
-			 * Report the worker failed during table synchronization. Abort
-			 * the current transaction so that the stats message is sent in an
-			 * idle state.
-			 */
-			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, false);
+			DisableSubscriptionOnError();
 
-			PG_RE_THROW();
-		}
+		/* Set the retry flag. */
+		set_subscription_retry(true);
+
+		proc_exit(0);
 	}
 	PG_END_TRY();
 
@@ -4386,20 +4418,27 @@ start_apply(XLogRecPtr origin_startpos)
 	}
 	PG_CATCH();
 	{
+		/*
+		 * Emit the error message, and recover from the error state to an idle
+		 * state
+		 */
+		HOLD_INTERRUPTS();
+
+		EmitErrorReport();
+		AbortOutOfAnyTransaction();
+		FlushErrorState();
+
+		RESUME_INTERRUPTS();
+
+		/* Report the worker failed while applying changes */
+		pgstat_report_subscription_error(MySubscription->oid,
+										 !am_tablesync_worker());
+
 		if (MySubscription->disableonerr)
-			DisableSubscriptionAndExit();
-		else
-		{
-			/*
-			 * Report the worker failed while applying changes. Abort the
-			 * current transaction so that the stats message is sent in an
-			 * idle state.
-			 */
-			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+			DisableSubscriptionOnError();
 
-			PG_RE_THROW();
-		}
+		/* Set the retry flag. */
+		set_subscription_retry(true);
 	}
 	PG_END_TRY();
 }
@@ -4675,39 +4714,20 @@ ApplyWorkerMain(Datum main_arg)
 }
 
 /*
- * After error recovery, disable the subscription in a new transaction
- * and exit cleanly.
+ * Disable the subscription in a new transaction.
  */
 static void
-DisableSubscriptionAndExit(void)
+DisableSubscriptionOnError(void)
 {
-	/*
-	 * Emit the error message, and recover from the error state to an idle
-	 * state
-	 */
-	HOLD_INTERRUPTS();
-
-	EmitErrorReport();
-	AbortOutOfAnyTransaction();
-	FlushErrorState();
-
-	RESUME_INTERRUPTS();
-
-	/* Report the worker failed during either table synchronization or apply */
-	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
-									 !am_tablesync_worker());
-
 	/* Disable the subscription */
 	StartTransactionCommand();
 	DisableSubscription(MySubscription->oid);
 	CommitTransactionCommand();
 
-	/* Notify the subscription has been disabled and exit */
+	/* Notify the subscription has been disabled */
 	ereport(LOG,
 			errmsg("subscription \"%s\" has been disabled because of an error",
 				   MySubscription->name));
-
-	proc_exit(0);
 }
 
 /*
@@ -5050,3 +5070,71 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 		return TRANS_LEADER_SEND_TO_PARALLEL;
 	}
 }
+
+/*
+ * Set subretry of pg_subscription catalog.
+ *
+ * If retry is true, subscriber is about to exit with an error. Otherwise, it
+ * means that the transaction was applied successfully.
+ */
+static void
+set_subscription_retry(bool retry)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		started_tx = false;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+
+	/* Fast path - if no state change then nothing to do */
+	if (MySubscription->retry == retry)
+		return;
+
+	/* Fast path - skip for parallel apply workers */
+	if (am_parallel_apply_worker())
+		return;
+
+	/* Fast path - skip set retry if no active parallel apply workers */
+	if (retry && !pa_have_active_worker())
+		return;
+
+	if (!IsTransactionState())
+	{
+		StartTransactionCommand();
+		started_tx = true;
+	}
+
+	/* Look up the subscription in the catalog */
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+							  ObjectIdGetDatum(MySubscription->oid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+	LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+					 AccessShareLock);
+
+	/* Form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Set subretry */
+	values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry);
+	replaces[Anum_pg_subscription_subretry - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/* Cleanup. */
+	heap_freetuple(tup);
+	table_close(rel, NoLock);
+
+	if (started_tx)
+		CommitTransactionCommand();
+}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 2c0a969..1bb7695 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4560,8 +4560,9 @@ getSubscriptions(Archive *fout)
 	ntups = PQntuples(res);
 
 	/*
-	 * Get subscription fields. We don't include subskiplsn in the dump as
-	 * after restoring the dump this value may no longer be relevant.
+	 * Get subscription fields. We don't include subskiplsn and subretry in
+	 * the dump as after restoring the dump this value may no longer be
+	 * relevant.
 	 */
 	i_tableoid = PQfnumber(res, "tableoid");
 	i_oid = PQfnumber(res, "oid");
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a17..8edbbca 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -88,6 +88,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subdisableonerr;	/* True if a worker error should cause the
 									 * subscription to be disabled */
 
+	bool		subretry BKI_DEFAULT(f);	/* True if previous change failed
+											 * to be applied while there were
+											 * any active parallel apply
+											 * workers */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -131,6 +136,8 @@ typedef struct Subscription
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
 								 * occurs */
+	bool		retry;			/* Indicates if previous change failed to be
+								 * applied using a parallel apply worker */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 7f6a9f9..2e466e0 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -308,6 +308,8 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
+extern bool pa_have_active_worker(void);
+
 #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
 
 static inline bool
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 83d6956..4054a5f 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -74,8 +74,16 @@ sub test_streaming
 		'check extra columns contain local defaults');
 
 	# Test the streaming in binary mode
+	my $oldpid = $node_publisher->safe_psql('postgres',
+		"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
+	);
 	$node_subscriber->safe_psql('postgres',
 		"ALTER SUBSCRIPTION tap_sub SET (binary = on)");
+	$node_publisher->poll_query_until('postgres',
+		"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
+	  )
+	  or die
+	  "Timed out while waiting for apply to restart after changing SUBSCRIPTION";
 
 	# Check the subscriber log from now on.
 	$offset = -s $node_subscriber->logfile;
@@ -450,6 +458,61 @@ $result =
 is($result, qq(5000),
 	'data replicated to subscriber by serializing messages to disk');
 
+# Clean up test data from the environment.
+$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
+$node_publisher->wait_for_catchup($appname);
+
+# ============================================================================
+# Test re-apply a failed streaming transaction using the leader apply
+# worker and apply subsequent streaming transaction using the parallel apply
+# worker after this retry succeeds.
+# ============================================================================
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE UNIQUE INDEX idx_tab on test_tab_2(a)");
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+	'postgres', qq{
+BEGIN;
+INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
+INSERT INTO test_tab_2 values(1);
+COMMIT;});
+
+# Check if the parallel apply worker is not started because the above
+# transaction failed to be applied.
+$node_subscriber->wait_for_log(
+	qr/DEBUG: ( [A-Z0-9]+:)? parallel apply workers are not used for retries/,
+	$offset);
+
+# Drop the unique index on the subscriber, now it works.
+$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab");
+
+# Wait for this streaming transaction to be applied in the apply worker.
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(5001), 'data replicated to subscriber after dropping index');
+
+# After successfully retrying to apply a failed streaming transaction, apply
+# the following streaming transaction using the parallel apply worker.
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab_2 SELECT i FROM generate_series(5001, 10000) s(i)");
+
+check_parallel_log($node_subscriber, $offset, 1, 'COMMIT');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(10001),
+	'data replicated to subscriber using the parallel apply worker');
+
 $node_subscriber->stop;
 $node_publisher->stop;
 
-- 
2.7.2.windows.1

