From 9f2d1ff2a181136efe2d5db0e6ac43bec909a1f1 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Thu, 3 Dec 2020 14:18:19 +0530
Subject: [PATCH v1] Allow more than one transaction in tablesync worker.

---
 src/backend/replication/logical/tablesync.c |  9 ++++++++-
 src/backend/replication/logical/worker.c    | 19 +++++--------------
 2 files changed, 13 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1904f34..886298e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -270,7 +270,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	Assert(IsTransactionState());
+	if (!IsTransactionState())
+		StartTransactionCommand();
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
@@ -294,6 +295,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+	if (IsTransactionState())
+		CommitTransactionCommand();
 }
 
 /*
@@ -943,6 +947,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	/* Make the copy visible. */
 	CommandCounterIncrement();
 
+	CommitTransactionCommand();
+	StartTransactionCommand();
+
 	/*
 	 * We are done with the initial data synchronization, update the state.
 	 */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8c7fad8..af6a98a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s)
 	/* We must be in a valid transaction state */
 	Assert(IsTransactionState());
 
-	/* The synchronization worker runs in single transaction. */
-	if (!am_tablesync_worker())
-	{
-		/* Commit the per-stream transaction */
-		CommitTransactionCommand();
-	}
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
 
 	in_streamed_transaction = false;
 
@@ -888,10 +884,7 @@ apply_handle_stream_abort(StringInfo s)
 		{
 			/* Cleanup the subxact info */
 			cleanup_subxact_info();
-
-			/* The synchronization worker runs in single transaction */
-			if (!am_tablesync_worker())
-				CommitTransactionCommand();
+			CommitTransactionCommand();
 			return;
 		}
 
@@ -918,8 +911,7 @@ apply_handle_stream_abort(StringInfo s)
 		/* write the updated subxact list */
 		subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-		if (!am_tablesync_worker())
-			CommitTransactionCommand();
+		CommitTransactionCommand();
 	}
 }
 
@@ -1062,8 +1054,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
 {
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
-- 
1.8.3.1

