From 1073207ed7838061a648b9b157f9422a08e010aa Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Wed, 11 Jun 2025 11:42:26 +0800
Subject: [PATCH vPOC] Apply non-streaming transactions in parallel apply
 worker

---
 src/backend/executor/execReplication.c        |  39 +-
 .../replication/logical/applyparallelworker.c | 427 +++++++++-
 src/backend/replication/logical/proto.c       |  35 +
 src/backend/replication/logical/relation.c    |  36 +-
 src/backend/replication/logical/worker.c      | 747 ++++++++++++++++--
 src/include/executor/executor.h               |   9 +-
 src/include/replication/logicalproto.h        |   3 +
 src/include/replication/logicalrelation.h     |   3 +
 src/include/replication/worker_internal.h     |  28 +-
 src/test/subscription/t/010_truncate.pl       |   2 +-
 src/test/subscription/t/026_stats.pl          |   1 +
 src/test/subscription/t/027_nosuperuser.pl    |   1 +
 12 files changed, 1203 insertions(+), 128 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42d..fdf73ba264f 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -139,6 +139,12 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
 	{
 		case TM_Ok:
 			break;
+		case TM_SelfModified:
+			ereport(LOG,
+					(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+					 errmsg("tuple to be updated was already modified in the current transaction")));
+			refetch = true;
+			break;
 		case TM_Updated:
 			/* XXX: Improve handling here */
 			if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
@@ -179,7 +185,8 @@ bool
 RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 							 LockTupleMode lockmode,
 							 TupleTableSlot *searchslot,
-							 TupleTableSlot *outslot)
+							 TupleTableSlot *outslot,
+							 bool locktuple)
 {
 	ScanKeyData skey[INDEX_MAX_KEYS];
 	int			skey_attoff;
@@ -246,7 +253,7 @@ retry:
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
-	if (found)
+	if (found && locktuple)
 	{
 		TM_FailureData tmfd;
 		TM_Result	res;
@@ -353,7 +360,8 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
  */
 bool
 RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
-						 TupleTableSlot *searchslot, TupleTableSlot *outslot)
+						 TupleTableSlot *searchslot, TupleTableSlot *outslot,
+						 bool locktuple)
 {
 	TupleTableSlot *scanslot;
 	TableScanDesc scan;
@@ -404,7 +412,7 @@ retry:
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
-	if (found)
+	if (found && locktuple)
 	{
 		TM_FailureData tmfd;
 		TM_Result	res;
@@ -431,6 +439,29 @@ retry:
 	return found;
 }
 
+bool
+RelationLockTuple(Relation rel, LockTupleMode lockmode,
+				  TupleTableSlot *searchslot, TupleTableSlot *outslot,
+				  CommandId cid)
+{
+	TM_FailureData tmfd;
+	TM_Result	res;
+
+	PushActiveSnapshot(GetLatestSnapshot());
+
+	res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
+						   outslot,
+						   cid,
+						   lockmode,
+						   LockWaitBlock,
+						   0 /* don't follow updates */ ,
+						   &tmfd);
+
+	PopActiveSnapshot();
+
+	return !should_refetch_tuple(res, &tmfd);
+}
+
 /*
  * Build additional index information necessary for conflict detection.
  */
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..0dd001e3623 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -209,6 +209,8 @@
 #define PARALLEL_APPLY_LOCK_STREAM	0
 #define PARALLEL_APPLY_LOCK_XACT	1
 
+#define PARALLEL_APPLY_INIT_RELATION	'r'
+
 /*
  * Hash table entry to map xid to the parallel apply worker state.
  */
@@ -216,8 +218,14 @@ typedef struct ParallelApplyWorkerEntry
 {
 	TransactionId xid;			/* Hash key -- must be first */
 	ParallelApplyWorkerInfo *winfo;
+	XLogRecPtr	local_end;
 } ParallelApplyWorkerEntry;
 
+typedef struct ParallelApplyCommitSeq
+{
+	pg_atomic_uint64 committable_seq_num;
+} ParallelApplyCommitSeq;
+
 /*
  * A hash table used to cache the state of streaming transactions being applied
  * by the parallel apply workers.
@@ -254,9 +262,18 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
 /* A list to maintain subtransactions, if any. */
 static List *subxactlist = NIL;
 
+static dsm_segment *commit_seq_seg = NULL;
+static ParallelApplyCommitSeq	*pa_commit_seq = NULL;
+
+static TransactionId last_parallelized_xid = InvalidTransactionId;
+static uint64 last_parallelized_commit_seq = 0;
+
+static dsm_handle pa_get_dsm_for_commit_seq(void);
 static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
+static bool pa_transaction_committed(uint64 commit_seq_num);
 static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
 static PartialFileSetState pa_get_fileset_state(void);
+static void pa_init_relmap_cache(StringInfo s);
 
 /*
  * Returns true if it is OK to start a parallel apply worker, false otherwise.
@@ -334,6 +351,12 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
 	shm_mq	   *mq;
 	Size		queue_size = DSM_QUEUE_SIZE;
 	Size		error_queue_size = DSM_ERROR_QUEUE_SIZE;
+	dsm_handle	commit_seq_handle;
+
+	commit_seq_handle = pa_get_dsm_for_commit_seq();
+
+	if (commit_seq_handle == DSM_HANDLE_INVALID)
+		return false;
 
 	/*
 	 * Estimate how much shared memory we need.
@@ -364,11 +387,12 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
 	/* Set up the header region. */
 	shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
 	SpinLockInit(&shared->mutex);
-
+	shared->xid = InvalidTransactionId;
 	shared->xact_state = PARALLEL_TRANS_UNKNOWN;
 	pg_atomic_init_u32(&(shared->pending_stream_count), 0);
 	shared->last_commit_end = InvalidXLogRecPtr;
 	shared->fileset_state = FS_EMPTY;
+	shared->commit_seq_handle = commit_seq_handle;
 
 	shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
 
@@ -396,6 +420,24 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
 	return true;
 }
 
+static dsm_handle
+pa_get_dsm_for_commit_seq(void)
+{
+	if (commit_seq_seg)
+		return dsm_segment_handle(commit_seq_seg);
+
+	/* Create the shared memory segment and establish a table of contents. */
+	commit_seq_seg = dsm_create(sizeof(ParallelApplyCommitSeq), 0);
+	if (!commit_seq_seg)
+		return DSM_HANDLE_INVALID;
+
+	pa_commit_seq = (ParallelApplyCommitSeq *) dsm_segment_address(commit_seq_seg);
+
+	pg_atomic_init_u64(&(pa_commit_seq->committable_seq_num), 0);
+
+	return dsm_segment_handle(commit_seq_seg);
+}
+
 /*
  * Try to get a parallel apply worker from the pool. If none is available then
  * start a new one.
@@ -413,10 +455,29 @@ pa_launch_parallel_worker(void)
 	{
 		winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
 
-		if (!winfo->in_use)
+		if (!winfo->stream_txn &&
+			pa_transaction_committed(winfo->shared->commit_seq_num))
+		{
+			/*
+			 * Save the local commit lsn of the last transaction that was
+			 * applied by this worker. We need to collect this info to
+			 * determine the flush position to reply to the publisher (See
+			 * get_flush_position()).
+			 */
+			(void) pa_get_last_commit_end(winfo->shared->xid, false, NULL);
+			return winfo;
+		}
+
+		if (winfo->stream_txn && !winfo->in_use)
 			return winfo;
 	}
 
+	pa_get_dsm_for_commit_seq();
+
+	if (list_length(ParallelApplyWorkerPool) ==
+		max_parallel_apply_workers_per_subscription)
+		return NULL;
+
 	/*
 	 * Start a new parallel apply worker.
 	 *
@@ -445,10 +506,23 @@ pa_launch_parallel_worker(void)
 
 	if (launched)
 	{
+		StringInfoData	out;
+
+		elog(DEBUG1, "started new pa worker");
 		ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
+
+		initStringInfo(&out);
+		appendStringInfoChar(&out, PARALLEL_APPLY_INIT_RELATION);
+		logicalrep_write_all_rels(&out);
+
+		if (out.len > 1)
+			pa_send_data(winfo, out.len, out.data);
+
+		pfree(out.data);
 	}
 	else
 	{
+		elog(DEBUG1, "failed to start pa worker");
 		pa_free_worker_info(winfo);
 		winfo = NULL;
 	}
@@ -467,7 +541,7 @@ pa_launch_parallel_worker(void)
  * streaming changes.
  */
 void
-pa_allocate_worker(TransactionId xid)
+pa_allocate_worker(TransactionId xid, bool stream_txn)
 {
 	bool		found;
 	ParallelApplyWorkerInfo *winfo = NULL;
@@ -504,11 +578,21 @@ pa_allocate_worker(TransactionId xid)
 	SpinLockAcquire(&winfo->shared->mutex);
 	winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
 	winfo->shared->xid = xid;
+	winfo->shared->commit_seq_num = last_parallelized_commit_seq;
+	winfo->shared->wait_for_xid = last_parallelized_xid;
 	SpinLockRelease(&winfo->shared->mutex);
 
 	winfo->in_use = true;
 	winfo->serialize_changes = false;
+	winfo->stream_txn = stream_txn;
 	entry->winfo = winfo;
+	entry->local_end = InvalidXLogRecPtr;
+
+	if (!stream_txn)
+	{
+		last_parallelized_xid = xid;
+		last_parallelized_commit_seq++;
+	}
 }
 
 /*
@@ -542,6 +626,57 @@ pa_find_worker(TransactionId xid)
 	return NULL;
 }
 
+XLogRecPtr
+pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write)
+{
+	bool		found;
+	ParallelApplyWorkerEntry *entry;
+	ParallelApplyWorkerInfo *winfo;
+
+	Assert(TransactionIdIsValid(xid));
+
+	if (skipped_write)
+		*skipped_write = false;
+
+	/* Find an entry for the requested transaction. */
+	entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
+
+	if (!found)
+		return InvalidXLogRecPtr;
+
+	winfo = entry->winfo;
+
+	if (winfo == NULL)
+	{
+		if (delete_entry &&
+			!hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL))
+			elog(ERROR, "hash table corrupted");
+
+		if (skipped_write)
+			*skipped_write = XLogRecPtrIsInvalid(entry->local_end);
+
+		return entry->local_end;
+	}
+
+	if (!pa_transaction_committed(winfo->shared->commit_seq_num))
+		return InvalidXLogRecPtr;
+
+	entry->local_end = winfo->shared->last_commit_end;
+	entry->winfo = NULL;
+
+	if (skipped_write)
+		*skipped_write = XLogRecPtrIsInvalid(entry->local_end);
+
+	elog(DEBUG1, "store local commit %X/%X end to txn entry: %u",
+		 LSN_FORMAT_ARGS(entry->local_end), xid);
+
+	if (delete_entry &&
+		!hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL))
+		elog(ERROR, "hash table corrupted");
+
+	return entry->local_end;
+}
+
 /*
  * Makes the worker available for reuse.
  *
@@ -557,7 +692,8 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
 {
 	Assert(!am_parallel_apply_worker());
 	Assert(winfo->in_use);
-	Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
+	Assert(!winfo->stream_txn ||
+		   pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
 
 	if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
 		elog(ERROR, "hash table corrupted");
@@ -573,9 +709,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
 	 * been serialized and then letting the parallel apply worker deal with
 	 * the spurious message, we stop the worker.
 	 */
-	if (winfo->serialize_changes ||
-		list_length(ParallelApplyWorkerPool) >
-		(max_parallel_apply_workers_per_subscription / 2))
+	if (winfo->serialize_changes)
 	{
 		logicalrep_pa_worker_stop(winfo);
 		pa_free_worker_info(winfo);
@@ -705,6 +839,106 @@ pa_process_spooled_messages_if_required(void)
 	return true;
 }
 
+void
+pa_advance_committable_seq_num(void)
+{
+	Assert(am_parallel_apply_worker());
+	pg_atomic_add_fetch_u64(&pa_commit_seq->committable_seq_num, 1);
+}
+
+static bool
+pa_transaction_committable(uint64 commit_seq_num)
+{
+	uint64	shared_seq_num;
+
+	shared_seq_num = pg_atomic_read_u64(&pa_commit_seq->committable_seq_num);
+
+	Assert(shared_seq_num <= commit_seq_num);
+
+	return commit_seq_num == shared_seq_num;
+}
+
+static bool
+pa_transaction_committed(uint64 commit_seq_num)
+{
+	return commit_seq_num <
+		   pg_atomic_read_u64(&pa_commit_seq->committable_seq_num);
+}
+
+static void
+pa_wait_for_transaction(TransactionId wait_for_xid, uint64 commit_seq_num)
+{
+	int		count = 0;
+
+	if (!TransactionIdIsValid(wait_for_xid) || commit_seq_num == 0)
+		return;
+
+	elog(DEBUG1, "plan to wait for commit seq: %lld, remote_xid %u to finish",
+		 (long long int) commit_seq_num, wait_for_xid);
+
+	for (;;)
+	{
+		if (pa_transaction_committable(commit_seq_num))
+			break;
+
+		if (count > 100000)
+		{
+			pa_lock_transaction(wait_for_xid, AccessShareLock);
+			pa_unlock_transaction(wait_for_xid, AccessShareLock);
+		}
+		else
+			count++;
+
+		/* An interrupt may have occurred while we were waiting. */
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	elog(DEBUG1, "finished wait for commit seq: %lld, remote_xid %u to finish",
+		 (long long int) commit_seq_num, wait_for_xid);
+}
+
+void
+pa_wait_until_committable(void)
+{
+	TransactionId	wait_for_xid = InvalidTransactionId;
+	uint64	commit_seq_num = 0;
+
+	if (am_parallel_apply_worker())
+	{
+		SpinLockAcquire(&MyParallelShared->mutex);
+		wait_for_xid = MyParallelShared->wait_for_xid;
+		commit_seq_num = MyParallelShared->commit_seq_num;
+		SpinLockRelease(&MyParallelShared->mutex);
+	}
+	else
+	{
+		wait_for_xid = last_parallelized_xid;
+		commit_seq_num = last_parallelized_commit_seq;
+	}
+
+	pa_wait_for_transaction(wait_for_xid, commit_seq_num);
+}
+
+bool
+pa_cur_transaction_committable(void)
+{
+	uint64	commit_seq_num = 0;
+
+	if (am_parallel_apply_worker())
+	{
+		SpinLockAcquire(&MyParallelShared->mutex);
+		commit_seq_num = MyParallelShared->commit_seq_num;
+		SpinLockRelease(&MyParallelShared->mutex);
+	}
+	else
+	{
+		commit_seq_num = last_parallelized_commit_seq;
+	}
+
+	return pa_transaction_committable(commit_seq_num);
+}
+
+
 /*
  * Interrupt handler for main loop of parallel apply worker.
  */
@@ -745,6 +979,10 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 												"ApplyMessageContext",
 												ALLOCSET_DEFAULT_SIZES);
 
+	ApplyBufferContext = AllocSetContextCreate(ApplyContext,
+											   "ApplyBufferContext",
+											   ALLOCSET_DEFAULT_SIZES);
+
 	/*
 	 * Push apply error context callback. Fields will be filled while applying
 	 * a change.
@@ -775,26 +1013,35 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 
 			initReadOnlyStringInfo(&s, data, len);
 
-			/*
-			 * The first byte of messages sent from leader apply worker to
-			 * parallel apply workers can only be 'w'.
-			 */
 			c = pq_getmsgbyte(&s);
-			if (c != 'w')
-				elog(ERROR, "unexpected message \"%c\"", c);
-
-			/*
-			 * Ignore statistics fields that have been updated by the leader
-			 * apply worker.
-			 *
-			 * XXX We can avoid sending the statistics fields from the leader
-			 * apply worker but for that, it needs to rebuild the entire
-			 * message by removing these fields which could be more work than
-			 * simply ignoring these fields in the parallel apply worker.
-			 */
-			s.cursor += SIZE_STATS_MESSAGE;
+			if (c == 'w')
+			{
+				/*
+				 * Ignore statistics fields that have been updated by the
+				 * leader apply worker.
+				 *
+				 * XXX We can avoid sending the statistics fields from the
+				 * leader apply worker but for that, it needs to rebuild the
+				 * entire message by removing these fields which could be more
+				 * work than simply ignoring these fields in the parallel apply
+				 * worker.
+				 */
+				s.cursor += SIZE_STATS_MESSAGE;
 
-			apply_dispatch(&s);
+				apply_dispatch(&s);
+			}
+			else if (c == PARALLEL_APPLY_INIT_RELATION)
+			{
+				pa_init_relmap_cache(&s);
+			}
+			else
+			{
+				/*
+				 * The first byte of messages sent from leader apply worker to
+				 * parallel apply workers can only be 'w' or 'r'.
+				 */
+				elog(ERROR, "unexpected message \"%c\"", c);
+			}
 		}
 		else if (shmq_res == SHM_MQ_WOULD_BLOCK)
 		{
@@ -811,6 +1058,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 
 				if (rc & WL_LATCH_SET)
 					ResetLatch(MyLatch);
+
+				if (!IsTransactionState())
+					pgstat_report_stat(true);
 			}
 		}
 		else
@@ -848,6 +1098,7 @@ pa_shutdown(int code, Datum arg)
 				   INVALID_PROC_NUMBER);
 
 	dsm_detach((dsm_segment *) DatumGetPointer(arg));
+	dsm_detach(commit_seq_seg);
 }
 
 /*
@@ -913,6 +1164,18 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 */
 	logicalrep_worker_attach(worker_slot);
 
+	/*
+	 * Attach to the dynamic shared memory segment for the parallel apply
+	 * commit sequence.
+	 */
+	commit_seq_seg = dsm_attach(MyParallelShared->commit_seq_handle);
+	if (!commit_seq_seg)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not map dynamic shared memory segment")));
+
+	pa_commit_seq = dsm_segment_address(commit_seq_seg);
+
 	/*
 	 * Register the shutdown callback after we are attached to the worker
 	 * slot. This is to ensure that MyLogicalRepWorker remains valid when this
@@ -1149,7 +1412,6 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	shm_mq_result result;
 	TimestampTz startTime = 0;
 
-	Assert(!IsTransactionState());
 	Assert(!winfo->serialize_changes);
 
 	/*
@@ -1201,6 +1463,51 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	}
 }
 
+void
+pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel)
+{
+	List *workers_stopped = NIL;
+	StringInfoData	out;
+
+	if (!ParallelApplyWorkerPool)
+		return;
+
+	initStringInfo(&out);
+	appendStringInfoChar(&out, PARALLEL_APPLY_INIT_RELATION);
+	logicalrep_write_remote_rel(&out, rel);
+
+	foreach_ptr(ParallelApplyWorkerInfo, winfo, ParallelApplyWorkerPool)
+	{
+		if (winfo == stream_apply_worker)
+			continue;
+
+		if (winfo->serialize_changes)
+			continue;
+
+		elog(DEBUG1, "distributing schema changes to pa workers");
+
+		if (pa_send_data(winfo, out.len, out.data))
+			continue;
+
+		elog(DEBUG1, "failed to distribute, will stop that worker instead");
+
+		/* cannot distribute to this worker, stop this worker */
+		pa_wait_for_transaction(winfo->shared->xid,
+								winfo->shared->commit_seq_num + 1);
+
+		pa_get_last_commit_end(winfo->shared->xid, false, NULL);
+
+		logicalrep_pa_worker_stop(winfo);
+
+		workers_stopped = lappend(workers_stopped, winfo);
+	}
+
+	pfree(out.data);
+
+	foreach_ptr(ParallelApplyWorkerInfo, winfo, workers_stopped)
+		pa_free_worker_info(winfo);
+}
+
 /*
  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
  * that the current data and any subsequent data for this transaction will be
@@ -1282,9 +1589,9 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
 	pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
 
 	/*
-	 * Wait for the transaction lock to be released. This is required to
-	 * detect deadlock among leader and parallel apply workers. Refer to the
-	 * comments atop this file.
+	 * Wait for the transaction lock to be released. This is required to detect
+	 * deadlock among leader and parallel apply. Refer to the comments atop
+	 * this file.
 	 */
 	pa_lock_transaction(winfo->shared->xid, AccessShareLock);
 	pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
@@ -1298,6 +1605,7 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("lost connection to the logical replication parallel apply worker")));
+
 }
 
 /*
@@ -1361,6 +1669,9 @@ pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
 void
 pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
 {
+	if (!TransactionIdIsValid(top_xid))
+		return;
+
 	if (current_xid != top_xid &&
 		!list_member_xid(subxactlist, current_xid))
 	{
@@ -1617,23 +1928,63 @@ pa_decr_and_wait_stream_block(void)
 void
 pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 {
-	Assert(am_leader_apply_worker());
+	XLogRecPtr	local_lsn = InvalidXLogRecPtr;
+	TransactionId	pa_remote_xid = winfo->shared->xid;
 
-	/*
-	 * Unlock the shared object lock so that parallel apply worker can
-	 * continue to receive and apply changes.
-	 */
-	pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
+	Assert(am_leader_apply_worker());
 
 	/*
+	 * Unlock the shared object lock taken for transactions so that parallel
+	 * apply worker can continue to receive and apply changes.
+	 *
 	 * Wait for that worker to finish. This is necessary to maintain commit
 	 * order which avoids failures due to transaction dependencies and
 	 * deadlocks.
 	 */
-	pa_wait_for_xact_finish(winfo);
+	if (winfo->serialize_changes || winfo->stream_txn)
+	{
+		pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
+		pa_wait_for_xact_finish(winfo);
+
+		local_lsn = winfo->shared->last_commit_end;
+		pa_remote_xid = InvalidTransactionId;
+
+		pa_free_worker(winfo);
+	}
 
 	if (!XLogRecPtrIsInvalid(remote_lsn))
-		store_flush_position(remote_lsn, winfo->shared->last_commit_end);
+		store_flush_position(remote_lsn, local_lsn, pa_remote_xid);
+
+	pa_set_stream_apply_worker(NULL);
+}
+
+static void
+pa_init_relmap_cache(StringInfo s)
+{
+	for (;;)
+	{
+		LogicalRepRelation *rel = logicalrep_read_rel(s);
+
+		logicalrep_relmap_update(rel);
+
+		elog(DEBUG1, "pa worker init relmap for %s", rel->relname);
+
+		if (s->cursor == s->len)
+			break;
+	}
+}
+
+void
+pa_update_commit_seq(ParallelApplyWorkerInfo *winfo)
+{
+	Assert(am_leader_apply_worker());
+	Assert(winfo->stream_txn);
+
+	SpinLockAcquire(&winfo->shared->mutex);
+	winfo->shared->commit_seq_num = last_parallelized_commit_seq;
+	winfo->shared->wait_for_xid = last_parallelized_xid;
+	SpinLockRelease(&winfo->shared->mutex);
 
-	pa_free_worker(winfo);
+	elog(DEBUG1, "updated wait event for pa worker, seq: %lld, wait xid: %u",
+		 (long long int) last_parallelized_commit_seq, last_parallelized_xid);
 }
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1a352b542dc..c072c44795f 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -691,6 +691,41 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
 	logicalrep_write_attrs(out, rel, columns, include_gencols_type);
 }
 
+void
+logicalrep_write_remote_rel(StringInfo out, LogicalRepRelation *rel)
+{
+	pq_sendint32(out, rel->remoteid);
+
+	/* Write relation name */
+	pq_sendstring(out, rel->nspname);
+	pq_sendstring(out, rel->relname);
+
+	/* Write the replica identity. */
+	pq_sendbyte(out, rel->replident);
+
+	/* Write attribute description */
+	pq_sendint16(out, rel->natts);
+
+	for (int i = 0; i < rel->natts; i++)
+	{
+		uint8		flags = 0;
+
+		if (bms_is_member(i, rel->attkeys))
+			flags |= LOGICALREP_IS_REPLICA_IDENTITY;
+
+		pq_sendbyte(out, flags);
+
+		/* attribute name */
+		pq_sendstring(out, rel->attnames[i]);
+
+		/* attribute type id */
+		pq_sendint32(out, rel->atttyps[i]);
+
+		/* ignore attribute mode for now */
+		pq_sendint32(out, 0);
+	}
+}
+
 /*
  * Read the relation info from stream and return as LogicalRepRelation.
  */
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index f59046ad620..35392cf9aac 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -133,6 +133,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
 {
 	LogicalRepRelation *remoterel;
 
+	if (entry->refcount)
+		elog(ERROR, "cannot free the entry when it is sill being used");
+
 	remoterel = &entry->remoterel;
 
 	pfree(remoterel->nspname);
@@ -366,6 +369,13 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 	remoterel = &entry->remoterel;
 
 	/* Ensure we don't leak a relcache refcount. */
+	if (entry->refcount > 0)
+	{
+		Assert(entry->localrel);
+		entry->refcount++;
+		return entry;
+	}
+
 	if (entry->localrel)
 		elog(ERROR, "remote relation ID %u is already open", remoteid);
 
@@ -494,6 +504,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 											   entry->localreloid,
 											   &entry->statelsn);
 
+	entry->refcount++;
+
 	return entry;
 }
 
@@ -503,8 +515,13 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 void
 logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
 {
-	table_close(rel->localrel, lockmode);
-	rel->localrel = NULL;
+	rel->refcount--;
+
+	if (!rel->refcount)
+	{
+		table_close(rel->localrel, lockmode);
+		rel->localrel = NULL;
+	}
 }
 
 /*
@@ -946,3 +963,18 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
 
 	return InvalidOid;
 }
+
+void
+logicalrep_write_all_rels(StringInfo out)
+{
+	LogicalRepRelMapEntry *entry;
+	HASH_SEQ_STATUS status;
+
+	if (LogicalRepRelMap == NULL)
+		return;
+
+	hash_seq_init(&status, LogicalRepRelMap);
+
+	while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+		logicalrep_write_remote_rel(out, &entry->remoterel);
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c5fb627aa56..9130f893787 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -193,6 +193,7 @@ typedef struct FlushPosition
 	dlist_node	node;
 	XLogRecPtr	local_end;
 	XLogRecPtr	remote_end;
+	TransactionId pa_remote_xid;
 } FlushPosition;
 
 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
@@ -209,6 +210,19 @@ typedef struct ApplyExecutionData
 	PartitionTupleRouting *proute;	/* partition routing info */
 } ApplyExecutionData;
 
+typedef struct ApplyBufferChange
+{
+	LogicalRepMsgType		action;
+	ApplyExecutionData	   *edata;
+	TupleTableSlot		   *old_slot;
+	TupleTableSlot		   *new_slot;
+	TupleTableSlot		   *local_slot;
+	LogicalRepTupleData	   *tuple;
+} ApplyBufferChange;
+
+static bool buffering_changes = false;
+static List *buffered_changes = NIL;
+
 /* Struct for saving and restoring apply errcontext information */
 typedef struct ApplyErrorCallbackArg
 {
@@ -283,6 +297,7 @@ ErrorContextCallback *apply_error_context_stack = NULL;
 
 MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
+MemoryContext ApplyBufferContext = NULL;
 
 /* per stream context for streaming transactions */
 static MemoryContext LogicalStreamingContext = NULL;
@@ -296,6 +311,7 @@ static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+static TransactionId remote_xid = InvalidTransactionId;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -364,11 +380,7 @@ static inline void cleanup_subxact_info(void);
 /*
  * Serialize and deserialize changes for a toplevel transaction.
  */
-static void stream_open_file(Oid subid, TransactionId xid,
-							 bool first_segment);
 static void stream_write_change(char action, StringInfo s);
-static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
-static void stream_close_file(void);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
@@ -380,11 +392,13 @@ static void apply_handle_update_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot,
 										 LogicalRepTupleData *newtup,
-										 Oid localindexoid);
+										 Oid localindexoid,
+										 TupleTableSlot *bufferedlocalslot);
 static void apply_handle_delete_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot,
-										 Oid localindexoid);
+										 Oid localindexoid,
+										 TupleTableSlot *bufferedlocalslot);
 static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
 									LogicalRepRelation *remoterel,
 									Oid localidxoid,
@@ -394,6 +408,14 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
 									   TupleTableSlot *remoteslot,
 									   LogicalRepTupleData *newtup,
 									   CmdType operation);
+static void apply_buffered_changes(void);
+static LogicalRepTupleData *apply_buffer_copy_tuple(LogicalRepTupleData *tupleData);
+static void apply_buffer_add_change(LogicalRepMsgType action,
+									ApplyExecutionData *edata,
+									TupleTableSlot *old_slot,
+									TupleTableSlot *new_slot,
+									LogicalRepTupleData *tuple);
+static void apply_buffer_store_local_slot(TupleTableSlot *local_slot);
 
 /* Functions for skipping changes */
 static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
@@ -508,7 +530,8 @@ begin_replication_step(void)
 		maybe_reread_subscription();
 	}
 
-	PushActiveSnapshot(GetTransactionSnapshot());
+	if (!buffering_changes || !ActiveSnapshotSet())
+		PushActiveSnapshot(GetTransactionSnapshot());
 
 	MemoryContextSwitchTo(ApplyMessageContext);
 }
@@ -523,6 +546,10 @@ begin_replication_step(void)
 static void
 end_replication_step(void)
 {
+	if (buffering_changes)
+		return;
+
+	Assert(ActiveSnapshotSet());
 	PopActiveSnapshot();
 
 	CommandCounterIncrement();
@@ -556,14 +583,16 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 	TransApplyAction apply_action;
 	StringInfoData original_msg;
 
-	apply_action = get_transaction_apply_action(stream_xid, &winfo);
+	Assert(!in_streamed_transaction || TransactionIdIsValid(stream_xid));
+
+	apply_action = get_transaction_apply_action(in_streamed_transaction
+												? stream_xid : remote_xid,
+												&winfo);
 
 	/* not in streaming mode */
 	if (apply_action == TRANS_LEADER_APPLY)
 		return false;
 
-	Assert(TransactionIdIsValid(stream_xid));
-
 	/*
 	 * The parallel apply worker needs the xid in this message to decide
 	 * whether to define a savepoint, so save the original message that has
@@ -574,9 +603,12 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 
 	/*
 	 * We should have received XID of the subxact as the first part of the
-	 * message, so extract it.
+	 * message in streaming transactions, so extract it.
 	 */
-	current_xid = pq_getmsgint(s, 4);
+	if (in_streamed_transaction)
+		current_xid = pq_getmsgint(s, 4);
+	else
+		current_xid = remote_xid;
 
 	if (!TransactionIdIsValid(current_xid))
 		ereport(ERROR,
@@ -685,10 +717,17 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel)
 	estate->es_opened_result_relations =
 		lappend(estate->es_opened_result_relations, resultRelInfo);
 
-	estate->es_output_cid = GetCurrentCommandId(true);
+	if (!buffering_changes)
+	{
+		estate->es_output_cid = GetCurrentCommandId(true);
 
-	/* Prepare to catch AFTER triggers. */
-	AfterTriggerBeginQuery();
+		/* Prepare to catch AFTER triggers. */
+		AfterTriggerBeginQuery();
+	}
+	else
+	{
+		estate->es_output_cid = GetCurrentCommandId(false);
+	}
 
 	/* other fields of edata remain NULL for now */
 
@@ -985,17 +1024,50 @@ static void
 apply_handle_begin(StringInfo s)
 {
 	LogicalRepBeginData begin_data;
+	ParallelApplyWorkerInfo *winfo;
+	TransApplyAction apply_action;
 
 	/* There must not be an active streaming transaction. */
 	Assert(!TransactionIdIsValid(stream_xid));
 
 	logicalrep_read_begin(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
+
+	remote_xid = begin_data.xid;
+
+	set_apply_error_context_xact(remote_xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
+	pa_allocate_worker(remote_xid, false);
+
+	apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+	elog(DEBUG1, "new remote_xid %u", remote_xid);
+	switch (apply_action)
+	{
+		case TRANS_LEADER_APPLY:
+			buffering_changes = !pa_cur_transaction_committable();
+			break;
+
+		case TRANS_LEADER_SEND_TO_PARALLEL:
+			Assert(winfo);
+			pa_send_data(winfo, s->len, s->data);
+			pa_set_stream_apply_worker(winfo);
+			break;
+
+		case TRANS_PARALLEL_APPLY:
+			/* Hold the lock until the end of the transaction. */
+			pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+			buffering_changes = !pa_cur_transaction_committable();
+			break;
+
+		default:
+			elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+			break;
+	}
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -1010,6 +1082,11 @@ static void
 apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
+	ParallelApplyWorkerInfo *winfo;
+	TransApplyAction apply_action;
+
+	/* Save the message before it is consumed. */
+	StringInfoData original_msg = *s;
 
 	logicalrep_read_commit(s, &commit_data);
 
@@ -1020,7 +1097,79 @@ apply_handle_commit(StringInfo s)
 								 LSN_FORMAT_ARGS(commit_data.commit_lsn),
 								 LSN_FORMAT_ARGS(remote_final_lsn))));
 
-	apply_handle_commit_internal(&commit_data);
+	apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+	switch (apply_action)
+	{
+		case TRANS_LEADER_APPLY:
+			if (buffering_changes)
+			{
+				pa_wait_until_committable();
+				apply_buffered_changes();
+			}
+
+			apply_handle_commit_internal(&commit_data);
+			break;
+
+		case TRANS_LEADER_SEND_TO_PARALLEL:
+			Assert(winfo);
+
+			if (pa_send_data(winfo, s->len, s->data))
+			{
+				/* Finish processing the transaction. */
+				pa_xact_finish(winfo, commit_data.end_lsn);
+				break;
+			}
+
+			/*
+			 * Switch to serialize mode when we are not able to send the
+			 * change to parallel apply worker.
+			 */
+			pa_switch_to_partial_serialize(winfo, true);
+
+			/* fall through */
+		case TRANS_LEADER_PARTIAL_SERIALIZE:
+			Assert(winfo);
+
+			stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT,
+										 &original_msg);
+
+			pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+			/* Finish processing the transaction. */
+			pa_xact_finish(winfo, commit_data.end_lsn);
+			break;
+
+		case TRANS_PARALLEL_APPLY:
+
+			/*
+			 * If the parallel apply worker is applying spooled messages then
+			 * close the file before committing.
+			 */
+			if (stream_fd)
+				stream_close_file();
+
+			pa_wait_until_committable();
+			apply_buffered_changes();
+			apply_handle_commit_internal(&commit_data);
+
+			MyParallelShared->last_commit_end = XactLastCommitEnd;
+
+			pa_advance_committable_seq_num();
+			pa_unlock_transaction(remote_xid, AccessExclusiveLock);
+			break;
+
+		default:
+			elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+			break;
+	}
+
+	remote_xid = InvalidTransactionId;
+	in_remote_transaction = false;
+
+	buffering_changes = false;
+
+	elog(DEBUG1, "reset remote_xid %u", remote_xid);
 
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
@@ -1140,7 +1289,8 @@ apply_handle_prepare(StringInfo s)
 	 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
 	 * it.
 	 */
-	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
+	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+						 InvalidTransactionId);
 
 	in_remote_transaction = false;
 
@@ -1185,6 +1335,8 @@ apply_handle_commit_prepared(StringInfo s)
 	/* There is no transaction when COMMIT PREPARED is called */
 	begin_replication_step();
 
+	/* TODO wait for xid to finish */
+
 	/*
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
@@ -1197,7 +1349,8 @@ apply_handle_commit_prepared(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd,
+						 InvalidTransactionId);
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1263,7 +1416,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	 * transaction because we always flush the WAL record for it. See
 	 * apply_handle_prepare.
 	 */
-	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
+	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr,
+						 InvalidTransactionId);
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1322,7 +1476,8 @@ apply_handle_stream_prepare(StringInfo s)
 			 * It is okay not to set the local_end LSN for the prepare because
 			 * we always flush the prepare record. See apply_handle_prepare.
 			 */
-			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
+			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+								 InvalidTransactionId);
 
 			in_remote_transaction = false;
 
@@ -1501,6 +1656,8 @@ apply_handle_stream_start(StringInfo s)
 	/* notify handle methods we're processing a remote transaction */
 	in_streamed_transaction = true;
 
+	buffering_changes = false;
+
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
@@ -1513,7 +1670,7 @@ apply_handle_stream_start(StringInfo s)
 
 	/* Try to allocate a worker for the streaming transaction. */
 	if (first_segment)
-		pa_allocate_worker(stream_xid);
+		pa_allocate_worker(stream_xid, true);
 
 	apply_action = get_transaction_apply_action(stream_xid, &winfo);
 
@@ -1535,6 +1692,8 @@ apply_handle_stream_start(StringInfo s)
 		case TRANS_LEADER_SEND_TO_PARALLEL:
 			Assert(winfo);
 
+			pa_update_commit_seq(winfo);
+
 			/*
 			 * Once we start serializing the changes, the parallel apply
 			 * worker will wait for the leader to release the stream lock
@@ -1571,6 +1730,12 @@ apply_handle_stream_start(StringInfo s)
 		case TRANS_LEADER_PARTIAL_SERIALIZE:
 			Assert(winfo);
 
+			/*
+			 * TODO, the pa worker could start to wait too soon when processing
+			 * some old stream start
+			 */
+			pa_update_commit_seq(winfo);
+
 			/*
 			 * Open the spool file unless it was already opened when switching
 			 * to serialize mode. The transaction started in
@@ -1599,6 +1764,8 @@ apply_handle_stream_start(StringInfo s)
 				logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 			}
 
+			pa_wait_until_committable();
+
 			parallel_stream_nchanges = 0;
 			break;
 
@@ -2069,6 +2236,8 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 
 	end_replication_step();
 
+	Assert(!buffering_changes);
+
 	/*
 	 * Read the entries one by one and pass them through the same logic as in
 	 * apply_dispatch.
@@ -2294,7 +2463,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 
 		pgstat_report_stat(false);
 
-		store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
+		store_flush_position(commit_data->end_lsn, XactLastCommitEnd,
+							 InvalidTransactionId);
 	}
 	else
 	{
@@ -2318,15 +2488,31 @@ static void
 apply_handle_relation(StringInfo s)
 {
 	LogicalRepRelation *rel;
+	ParallelApplyWorkerInfo *winfo;
+	TransApplyAction apply_action;
 
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
 		return;
 
 	rel = logicalrep_read_rel(s);
+
+	apply_action = get_transaction_apply_action(in_streamed_transaction
+												? stream_xid : remote_xid,
+												&winfo);
+
+	if (apply_action == TRANS_LEADER_APPLY)
+	{
+		pa_wait_until_committable();
+		apply_buffered_changes();
+	}
+
 	logicalrep_relmap_update(rel);
 
 	/* Also reset all entries in the partition map that refer to remoterel. */
 	logicalrep_partmap_reset_relmap(rel);
+
+	if (am_leader_apply_worker())
+		pa_distribute_schema_changes_to_workers(rel);
 }
 
 /*
@@ -2394,6 +2580,7 @@ apply_handle_insert(StringInfo s)
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
+	MemoryContext applyctx;
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
@@ -2405,6 +2592,9 @@ apply_handle_insert(StringInfo s)
 		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
 		return;
 
+	if (buffering_changes && pa_cur_transaction_committable())
+		apply_buffered_changes();
+
 	begin_replication_step();
 
 	relid = logicalrep_read_insert(s, &newtup);
@@ -2431,6 +2621,9 @@ apply_handle_insert(StringInfo s)
 	/* Set relation for error callback */
 	apply_error_callback_arg.rel = rel;
 
+	if (buffering_changes)
+		applyctx = MemoryContextSwitchTo(ApplyBufferContext);
+
 	/* Initialize the executor state. */
 	edata = create_edata_for_relation(rel);
 	estate = edata->estate;
@@ -2444,20 +2637,84 @@ apply_handle_insert(StringInfo s)
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
+	if (buffering_changes)
+	{
+		apply_buffer_add_change(LOGICAL_REP_MSG_INSERT, edata, NULL,
+								remoteslot, NULL);
+		MemoryContextSwitchTo(applyctx);
+	}
+	else
+	{
+		/* For a partitioned table, insert the tuple into a partition. */
+		if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+			apply_handle_tuple_routing(edata,
+									   remoteslot, NULL, CMD_INSERT);
+		else
+		{
+			ResultRelInfo *relinfo = edata->targetRelInfo;
+
+			ExecOpenIndices(relinfo, false);
+			apply_handle_insert_internal(edata, relinfo, remoteslot);
+			ExecCloseIndices(relinfo);
+		}
+
+		finish_edata(edata);
+
+		logicalrep_rel_close(rel, NoLock);
+	}
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	if (!run_as_owner)
+		RestoreUserContext(&ucxt);
+
+	end_replication_step();
+}
+
+static void
+apply_handle_buffered_insert(ApplyBufferChange *change)
+{
+	LogicalRepRelMapEntry *rel = change->edata->targetRel;
+	UserContext ucxt;
+	bool		run_as_owner;
+
+	Assert(!buffering_changes);
+
+	begin_replication_step();
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = rel;
+
+	/*
+	 * Make sure that any user-supplied code runs as the table owner, unless
+	 * the user has opted out of that behavior.
+	 */
+	run_as_owner = MySubscription->runasowner;
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+	change->edata->estate->es_output_cid = GetCurrentCommandId(true);
+
+	/* Prepare to catch AFTER triggers. */
+	AfterTriggerBeginQuery();
+
 	/* For a partitioned table, insert the tuple into a partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		apply_handle_tuple_routing(edata,
-								   remoteslot, NULL, CMD_INSERT);
+		apply_handle_tuple_routing(change->edata,
+								   change->new_slot, NULL, CMD_INSERT);
 	else
 	{
-		ResultRelInfo *relinfo = edata->targetRelInfo;
+		ResultRelInfo *relinfo = change->edata->targetRelInfo;
 
 		ExecOpenIndices(relinfo, false);
-		apply_handle_insert_internal(edata, relinfo, remoteslot);
+		apply_handle_insert_internal(change->edata, relinfo, change->new_slot);
 		ExecCloseIndices(relinfo);
 	}
 
-	finish_edata(edata);
+	finish_edata(change->edata);
+
+	logicalrep_rel_close(rel, NoLock);
 
 	/* Reset relation for error callback */
 	apply_error_callback_arg.rel = NULL;
@@ -2465,8 +2722,6 @@ apply_handle_insert(StringInfo s)
 	if (!run_as_owner)
 		RestoreUserContext(&ucxt);
 
-	logicalrep_rel_close(rel, NoLock);
-
 	end_replication_step();
 }
 
@@ -2554,6 +2809,7 @@ apply_handle_update(StringInfo s)
 	bool		has_oldtup;
 	TupleTableSlot *remoteslot;
 	RTEPermissionInfo *target_perminfo;
+	MemoryContext applyctx;
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
@@ -2565,6 +2821,9 @@ apply_handle_update(StringInfo s)
 		handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
 		return;
 
+	if (buffering_changes && pa_cur_transaction_committable())
+		apply_buffered_changes();
+
 	begin_replication_step();
 
 	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
@@ -2595,6 +2854,9 @@ apply_handle_update(StringInfo s)
 	if (!run_as_owner)
 		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
 
+	if (buffering_changes)
+		applyctx = MemoryContextSwitchTo(ApplyBufferContext);
+
 	/* Initialize the executor state. */
 	edata = create_edata_for_relation(rel);
 	estate = edata->estate;
@@ -2632,15 +2894,34 @@ apply_handle_update(StringInfo s)
 					has_oldtup ? &oldtup : &newtup);
 	MemoryContextSwitchTo(oldctx);
 
-	/* For a partitioned table, apply update to correct partition. */
-	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		apply_handle_tuple_routing(edata,
-								   remoteslot, &newtup, CMD_UPDATE);
+	if (buffering_changes)
+	{
+		apply_buffer_add_change(LOGICAL_REP_MSG_UPDATE, edata,
+								has_oldtup ? remoteslot : NULL,
+								has_oldtup ? NULL : remoteslot,
+								apply_buffer_copy_tuple(&newtup));
+		MemoryContextSwitchTo(applyctx);
+
+		if (rel->localrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+			apply_handle_update_internal(edata, edata->targetRelInfo,
+										 remoteslot, &newtup,
+										 rel->localindexoid, NULL);
+	}
 	else
-		apply_handle_update_internal(edata, edata->targetRelInfo,
-									 remoteslot, &newtup, rel->localindexoid);
+	{
+		/* For a partitioned table, apply update to correct partition. */
+		if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+			apply_handle_tuple_routing(edata,
+									   remoteslot, &newtup, CMD_UPDATE);
+		else
+			apply_handle_update_internal(edata, edata->targetRelInfo,
+										 remoteslot, &newtup, rel->localindexoid,
+										 NULL);
+
+		finish_edata(edata);
 
-	finish_edata(edata);
+		logicalrep_rel_close(rel, NoLock);
+	}
 
 	/* Reset relation for error callback */
 	apply_error_callback_arg.rel = NULL;
@@ -2648,8 +2929,68 @@ apply_handle_update(StringInfo s)
 	if (!run_as_owner)
 		RestoreUserContext(&ucxt);
 
+	end_replication_step();
+}
+
+static void
+apply_handle_buffered_update(ApplyBufferChange *change)
+{
+	LogicalRepRelMapEntry *rel = change->edata->targetRel;
+	UserContext ucxt;
+	bool		run_as_owner;
+	CommandId	original_cid;
+
+	Assert(!buffering_changes);
+
+	begin_replication_step();
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = rel;
+
+	/*
+	 * Make sure that any user-supplied code runs as the table owner, unless
+	 * the user has opted out of that behavior.
+	 */
+	run_as_owner = MySubscription->runasowner;
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+	original_cid = change->edata->estate->es_output_cid;
+	change->edata->estate->es_output_cid = GetCurrentCommandId(true);
+
+	/* Prepare to catch AFTER triggers. */
+	AfterTriggerBeginQuery();
+
+	/* For a partitioned table, apply update to correct partition. */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		apply_handle_tuple_routing(change->edata,
+								   change->new_slot ? change->new_slot : change->old_slot,
+								   change->tuple, CMD_UPDATE);
+	else
+	{
+		bool	locked = false;
+
+		if (change->local_slot)
+			locked = RelationLockTuple(rel->localrel, LockTupleExclusive,
+									   change->new_slot ? change->new_slot : change->old_slot,
+									   change->local_slot, original_cid);
+
+		apply_handle_update_internal(change->edata, change->edata->targetRelInfo,
+									 change->new_slot ? change->new_slot : change->old_slot,
+									 change->tuple, rel->localindexoid,
+									 locked ? change->local_slot : NULL);
+	}
+
+	finish_edata(change->edata);
+
 	logicalrep_rel_close(rel, NoLock);
 
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	if (!run_as_owner)
+		RestoreUserContext(&ucxt);
+
 	end_replication_step();
 }
 
@@ -2663,25 +3004,35 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 							 ResultRelInfo *relinfo,
 							 TupleTableSlot *remoteslot,
 							 LogicalRepTupleData *newtup,
-							 Oid localindexoid)
+							 Oid localindexoid,
+							 TupleTableSlot *bufferedlocalslot)
 {
 	EState	   *estate = edata->estate;
 	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
 	Relation	localrel = relinfo->ri_RelationDesc;
 	EPQState	epqstate;
-	TupleTableSlot *localslot = NULL;
+	TupleTableSlot *localslot = bufferedlocalslot;
 	ConflictTupleInfo conflicttuple = {0};
-	bool		found;
+	bool		found = (bufferedlocalslot != NULL);
 	MemoryContext oldctx;
 
+	if (!found)
+	{
+		found = FindReplTupleInLocalRel(edata, localrel,
+										&relmapentry->remoterel,
+										localindexoid,
+										remoteslot, &localslot);
+	}
+
+	if (buffering_changes)
+	{
+		apply_buffer_store_local_slot(found ? localslot : NULL);
+		return;
+	}
+
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
 	ExecOpenIndices(relinfo, false);
 
-	found = FindReplTupleInLocalRel(edata, localrel,
-									&relmapentry->remoterel,
-									localindexoid,
-									remoteslot, &localslot);
-
 	/*
 	 * Tuple found.
 	 *
@@ -2759,6 +3110,7 @@ apply_handle_delete(StringInfo s)
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
+	MemoryContext applyctx;
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
@@ -2770,6 +3122,9 @@ apply_handle_delete(StringInfo s)
 		handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
 		return;
 
+	if (buffering_changes && pa_cur_transaction_committable())
+		apply_buffered_changes();
+
 	begin_replication_step();
 
 	relid = logicalrep_read_delete(s, &oldtup);
@@ -2799,6 +3154,9 @@ apply_handle_delete(StringInfo s)
 	if (!run_as_owner)
 		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
 
+	if (buffering_changes)
+		applyctx = MemoryContextSwitchTo(ApplyBufferContext);
+
 	/* Initialize the executor state. */
 	edata = create_edata_for_relation(rel);
 	estate = edata->estate;
@@ -2811,21 +3169,99 @@ apply_handle_delete(StringInfo s)
 	slot_store_data(remoteslot, rel, &oldtup);
 	MemoryContextSwitchTo(oldctx);
 
+	if (buffering_changes)
+	{
+		apply_buffer_add_change(LOGICAL_REP_MSG_DELETE, edata, remoteslot,
+								NULL, NULL);
+		MemoryContextSwitchTo(applyctx);
+
+		if (rel->localrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+			apply_handle_delete_internal(edata, edata->targetRelInfo,
+										 remoteslot, rel->localindexoid, NULL);
+	}
+	else
+	{
+		/* For a partitioned table, apply delete to correct partition. */
+		if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+			apply_handle_tuple_routing(edata,
+									   remoteslot, NULL, CMD_DELETE);
+		else
+		{
+			ResultRelInfo *relinfo = edata->targetRelInfo;
+
+			ExecOpenIndices(relinfo, false);
+			apply_handle_delete_internal(edata, relinfo,
+										 remoteslot, rel->localindexoid, NULL);
+			ExecCloseIndices(relinfo);
+		}
+
+		finish_edata(edata);
+
+		logicalrep_rel_close(rel, NoLock);
+	}
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	if (!run_as_owner)
+		RestoreUserContext(&ucxt);
+
+	end_replication_step();
+}
+
+static void
+apply_handle_buffered_delete(ApplyBufferChange *change)
+{
+	LogicalRepRelMapEntry *rel = change->edata->targetRel;
+	UserContext ucxt;
+	bool		run_as_owner;
+	CommandId	original_cid;
+
+	Assert(!buffering_changes);
+
+	begin_replication_step();
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = rel;
+
+	/*
+	 * Make sure that any user-supplied code runs as the table owner, unless
+	 * the user has opted out of that behavior.
+	 */
+	run_as_owner = MySubscription->runasowner;
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+	original_cid = change->edata->estate->es_output_cid;
+	change->edata->estate->es_output_cid = GetCurrentCommandId(true);
+
+	/* Prepare to catch AFTER triggers. */
+	AfterTriggerBeginQuery();
+
 	/* For a partitioned table, apply delete to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		apply_handle_tuple_routing(edata,
-								   remoteslot, NULL, CMD_DELETE);
+		apply_handle_tuple_routing(change->edata,
+								   change->old_slot, NULL, CMD_DELETE);
 	else
 	{
-		ResultRelInfo *relinfo = edata->targetRelInfo;
+		ResultRelInfo *relinfo = change->edata->targetRelInfo;
+		bool	locked = false;
+
+		if (change->local_slot)
+			locked = RelationLockTuple(rel->localrel, LockTupleExclusive,
+									   change->old_slot, change->local_slot,
+									   original_cid);
 
 		ExecOpenIndices(relinfo, false);
-		apply_handle_delete_internal(edata, relinfo,
-									 remoteslot, rel->localindexoid);
+		apply_handle_delete_internal(change->edata, relinfo,
+									 change->old_slot, rel->localindexoid,
+									 locked ? change->local_slot : NULL);
 		ExecCloseIndices(relinfo);
 	}
 
-	finish_edata(edata);
+	finish_edata(change->edata);
+
+	logicalrep_rel_close(rel, NoLock);
 
 	/* Reset relation for error callback */
 	apply_error_callback_arg.rel = NULL;
@@ -2833,8 +3269,6 @@ apply_handle_delete(StringInfo s)
 	if (!run_as_owner)
 		RestoreUserContext(&ucxt);
 
-	logicalrep_rel_close(rel, NoLock);
-
 	end_replication_step();
 }
 
@@ -2847,25 +3281,33 @@ static void
 apply_handle_delete_internal(ApplyExecutionData *edata,
 							 ResultRelInfo *relinfo,
 							 TupleTableSlot *remoteslot,
-							 Oid localindexoid)
+							 Oid localindexoid,
+							 TupleTableSlot *bufferedlocalslot)
 {
 	EState	   *estate = edata->estate;
 	Relation	localrel = relinfo->ri_RelationDesc;
 	LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
 	EPQState	epqstate;
-	TupleTableSlot *localslot;
+	TupleTableSlot *localslot = bufferedlocalslot;
 	ConflictTupleInfo conflicttuple = {0};
-	bool		found;
+	bool		found = (bufferedlocalslot != NULL);
 
-	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
+	if (!found)
+		found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
+										remoteslot, &localslot);
+
+	if (buffering_changes)
+	{
+		apply_buffer_store_local_slot(found ? localslot : NULL);
+		return;
+	}
 
 	/* Caller should have opened indexes already. */
 	Assert(relinfo->ri_IndexRelationDescs != NULL ||
 		   !localrel->rd_rel->relhasindex ||
 		   RelationGetIndexList(localrel) == NIL);
 
-	found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
-									remoteslot, &localslot);
+	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
 
 	/* If found delete it. */
 	if (found)
@@ -2919,6 +3361,7 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
 						TupleTableSlot **localslot)
 {
 	EState	   *estate = edata->estate;
+	MemoryContext	oldctx;
 	bool		found;
 
 	/*
@@ -2927,8 +3370,14 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
 	 */
 	TargetPrivilegesCheck(localrel, ACL_SELECT);
 
+	if (buffering_changes)
+		oldctx = MemoryContextSwitchTo(ApplyBufferContext);
+
 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
 
+	if (buffering_changes)
+		MemoryContextSwitchTo(oldctx);
+
 	Assert(OidIsValid(localidxoid) ||
 		   (remoterel->replident == REPLICA_IDENTITY_FULL));
 
@@ -2947,11 +3396,13 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
 
 		found = RelationFindReplTupleByIndex(localrel, localidxoid,
 											 LockTupleExclusive,
-											 remoteslot, *localslot);
+											 remoteslot, *localslot,
+											 !buffering_changes);
 	}
 	else
 		found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
-										 remoteslot, *localslot);
+										 remoteslot, *localslot,
+										 !buffering_changes);
 
 	return found;
 }
@@ -3048,7 +3499,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 		case CMD_DELETE:
 			apply_handle_delete_internal(edata, partrelinfo,
 										 remoteslot_part,
-										 part_entry->localindexoid);
+										 part_entry->localindexoid,
+										 NULL);
 			break;
 
 		case CMD_UPDATE:
@@ -3250,6 +3702,8 @@ apply_handle_truncate(StringInfo s)
 	ListCell   *lc;
 	LOCKMODE	lockmode = AccessExclusiveLock;
 
+	elog(LOG, "truncate");
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -3258,6 +3712,9 @@ apply_handle_truncate(StringInfo s)
 		handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
 		return;
 
+	pa_wait_until_committable();
+	apply_buffered_changes();
+
 	begin_replication_step();
 
 	remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
@@ -3471,6 +3928,115 @@ apply_dispatch(StringInfo s)
 	apply_error_callback_arg.command = saved_command;
 }
 
+static void
+apply_buffered_changes(void)
+{
+	buffering_changes = false;
+	elog(DEBUG1, "started apply buffered changes %d", list_length(buffered_changes));
+
+	if (!buffered_changes)
+		return;
+
+	foreach_ptr(ApplyBufferChange, change, buffered_changes)
+	{
+		LogicalRepMsgType saved_command;
+
+		/*
+		 * Set the current command being applied. Since this function can be
+		 * called recursively when applying spooled changes, save the current
+		 * command.
+		 */
+		saved_command = apply_error_callback_arg.command;
+		apply_error_callback_arg.command = change->action;
+
+		switch (change->action)
+		{
+			case LOGICAL_REP_MSG_INSERT:
+				apply_handle_buffered_insert(change);
+				break;
+
+			case LOGICAL_REP_MSG_UPDATE:
+				apply_handle_buffered_update(change);
+				break;
+
+			case LOGICAL_REP_MSG_DELETE:
+				apply_handle_buffered_delete(change);
+				break;
+
+			default:
+				ereport(ERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("invalid logical replication message type \"??? (%d)\"", change->action)));
+		}
+
+		/* Reset the current command */
+		apply_error_callback_arg.command = saved_command;
+	}
+
+	Assert(ActiveSnapshotSet());
+	PopActiveSnapshot();
+	Assert(!ActiveSnapshotSet());
+	MemoryContextReset(ApplyBufferContext);
+	buffered_changes = NIL;
+}
+
+static LogicalRepTupleData *
+apply_buffer_copy_tuple(LogicalRepTupleData *tupleData)
+{
+	LogicalRepTupleData *res;
+	int		ncols = tupleData->ncols;
+
+	Assert(CurrentMemoryContext == ApplyBufferContext);
+
+	res = palloc0_object(LogicalRepTupleData);
+
+	/* Allocate space for per-column values; zero out unused StringInfoDatas */
+	res->colvalues = (StringInfoData *) palloc0(ncols * sizeof(StringInfoData));
+	res->colstatus = (char *) palloc(ncols * sizeof(char));
+	res->ncols = ncols;
+
+	for (int i = 0; i < ncols; i++)
+	{
+		initStringInfo(&res->colvalues[i]);
+		appendBinaryStringInfo(&res->colvalues[i],
+							   tupleData->colvalues[i].data,
+							   tupleData->colvalues[i].len);
+		res->colstatus[i] = tupleData->colstatus[i];
+	}
+
+	return res;
+}
+
+static void
+apply_buffer_add_change(LogicalRepMsgType action, ApplyExecutionData *edata,
+						TupleTableSlot *old_slot, TupleTableSlot *new_slot,
+						LogicalRepTupleData *tuple)
+{
+	ApplyBufferChange *change;
+
+	Assert(CurrentMemoryContext == ApplyBufferContext);
+
+	change = palloc0_object(ApplyBufferChange);
+	change->action = action;
+	change->edata = edata;
+	change->new_slot = new_slot;
+	change->old_slot = old_slot;
+	change->tuple = tuple;
+
+	buffered_changes = lappend(buffered_changes, change);
+}
+
+static void
+apply_buffer_store_local_slot(TupleTableSlot *local_slot)
+{
+	ApplyBufferChange *change;
+
+	Assert(buffered_changes != NIL);
+
+	change = (ApplyBufferChange *) llast(buffered_changes);
+	change->local_slot = local_slot;
+}
+
 /*
  * Figure out which write/flush positions to report to the walsender process.
  *
@@ -3499,6 +4065,36 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
 		FlushPosition *pos =
 			dlist_container(FlushPosition, node, iter.cur);
 
+		/*
+		 * Get the parallel apply worker that holds the local commit lsn of the
+		 * transaction.
+		 */
+		if (TransactionIdIsValid(pos->pa_remote_xid) &&
+			XLogRecPtrIsInvalid(pos->local_end))
+		{
+			bool	skipped_write;
+
+			pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true,
+													&skipped_write);
+
+			elog(DEBUG1,
+				"got commit end from parallel apply worker, "
+				"txn: %u, remote_end %X/%X, local_end %X/%X",
+				 pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end),
+				 LSN_FORMAT_ARGS(pos->local_end));
+
+			/* Return if the worker has not finished applying */
+			if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end))
+			{
+				*have_pending_txes = true;
+				return;
+			}
+		}
+
+		/*
+		 * Worker has finished applying or the transaction was applied in the
+		 * leader apply worker
+		 */
 		*write = pos->remote_end;
 
 		if (pos->local_end <= local_flush)
@@ -3507,19 +4103,6 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
 			dlist_delete(iter.cur);
 			pfree(pos);
 		}
-		else
-		{
-			/*
-			 * Don't want to uselessly iterate over the rest of the list which
-			 * could potentially be long. Instead get the last element and
-			 * grab the write position from there.
-			 */
-			pos = dlist_tail_element(FlushPosition, node,
-									 &lsn_mapping);
-			*write = pos->remote_end;
-			*have_pending_txes = true;
-			return;
-		}
 	}
 
 	*have_pending_txes = !dlist_is_empty(&lsn_mapping);
@@ -3529,7 +4112,8 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
  * Store current remote/local lsn pair in the tracking list.
  */
 void
-store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
+store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn,
+					 TransactionId remote_xid)
 {
 	FlushPosition *flushpos;
 
@@ -3547,6 +4131,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
 	flushpos->local_end = local_lsn;
 	flushpos->remote_end = remote_lsn;
+	flushpos->pa_remote_xid = remote_xid;
 
 	dlist_push_tail(&lsn_mapping, &flushpos->node);
 	MemoryContextSwitchTo(ApplyMessageContext);
@@ -3594,6 +4179,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 													"LogicalStreamingContext",
 													ALLOCSET_DEFAULT_SIZES);
 
+	ApplyBufferContext = AllocSetContextCreate(ApplyContext,
+											   "ApplyBufferContext",
+											   ALLOCSET_DEFAULT_SIZES);
+
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -4324,7 +4913,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
  * changes for this transaction, create the buffile, otherwise open the
  * previously created file.
  */
-static void
+void
 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 {
 	char		path[MAXPGPATH];
@@ -4369,7 +4958,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
  * stream_close_file
  *	  Close the currently open file with streamed changes.
  */
-static void
+void
 stream_close_file(void)
 {
 	Assert(stream_fd != NULL);
@@ -4417,7 +5006,7 @@ stream_write_change(char action, StringInfo s)
  * target file if not already before writing the message and close the file at
  * the end.
  */
-static void
+void
 stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
 {
 	Assert(!in_streamed_transaction);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544d..ed9ba8fdb9c 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -756,9 +756,14 @@ extern void check_exclusion_constraint(Relation heap, Relation index,
 extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 										 LockTupleMode lockmode,
 										 TupleTableSlot *searchslot,
-										 TupleTableSlot *outslot);
+										 TupleTableSlot *outslot,
+										 bool locktuple);
 extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
-									 TupleTableSlot *searchslot, TupleTableSlot *outslot);
+									 TupleTableSlot *searchslot, TupleTableSlot *outslot,
+									 bool locktuple);
+extern bool RelationLockTuple(Relation rel, LockTupleMode lockmode,
+							  TupleTableSlot *searchslot,
+							  TupleTableSlot *outslot, CommandId cid);
 
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 									 EState *estate, TupleTableSlot *slot);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..ce880f179dc 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -64,6 +64,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_DELETE = 'D',
 	LOGICAL_REP_MSG_TRUNCATE = 'T',
 	LOGICAL_REP_MSG_RELATION = 'R',
+	LOGICAL_REP_MSG_INTERNAL_RELATION = 'r',
 	LOGICAL_REP_MSG_TYPE = 'Y',
 	LOGICAL_REP_MSG_MESSAGE = 'M',
 	LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
@@ -251,6 +252,8 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel, Bitmapset *columns,
 								 PublishGencolsType include_gencols_type);
+extern void logicalrep_write_remote_rel(StringInfo out,
+										LogicalRepRelation *rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 7a561a8e8d8..d35bd00e93c 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,8 @@ typedef struct LogicalRepRelMapEntry
 	/* Sync state. */
 	char		state;
 	XLogRecPtr	statelsn;
+
+	int			refcount;
 } LogicalRepRelMapEntry;
 
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
@@ -50,5 +52,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
 extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap);
 extern Oid	GetRelationIdentityOrPK(Relation rel);
+extern void logicalrep_write_all_rels(StringInfo out);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952c..e0c20da7299 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared
 	 */
 	PartialFileSetState fileset_state;
 	FileSet		fileset;
+
+	dsm_handle	commit_seq_handle;
+
+	uint64	commit_seq_num;
+	TransactionId	wait_for_xid;
 } ParallelApplyWorkerShared;
 
 /*
@@ -214,7 +219,11 @@ typedef struct ParallelApplyWorkerInfo
 	 */
 	bool		in_use;
 
+	bool		stream_txn;
+
 	ParallelApplyWorkerShared *shared;
+
+	bool		collected_local_end;
 } ParallelApplyWorkerInfo;
 
 /* Main memory context for apply worker. Permanent during worker lifetime. */
@@ -222,6 +231,8 @@ extern PGDLLIMPORT MemoryContext ApplyContext;
 
 extern PGDLLIMPORT MemoryContext ApplyMessageContext;
 
+extern PGDLLIMPORT MemoryContext ApplyBufferContext;
+
 extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
 
 extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
@@ -275,6 +286,10 @@ extern void apply_dispatch(StringInfo s);
 extern void maybe_reread_subscription(void);
 
 extern void stream_cleanup_files(Oid subid, TransactionId xid);
+extern void stream_open_file(Oid subid, TransactionId xid, bool first_segment);
+extern void stream_close_file(void);
+extern void stream_open_and_write_change(TransactionId xid, char action,
+										 StringInfo s);
 
 extern void set_stream_options(WalRcvStreamOptions *options,
 							   char *slotname,
@@ -288,19 +303,27 @@ extern void SetupApplyOrSyncWorker(int worker_slot);
 
 extern void DisableSubscriptionAndExit(void);
 
-extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
+extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn,
+								 TransactionId remote_xid);
 
 /* Function for apply error callback */
 extern void apply_error_callback(void *arg);
 extern void set_apply_error_context_origin(char *originname);
 
 /* Parallel apply worker setup and interactions */
-extern void pa_allocate_worker(TransactionId xid);
+extern void pa_allocate_worker(TransactionId xid, bool stream_txn);
 extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
+extern XLogRecPtr pa_get_last_commit_end(TransactionId xid, bool delete_entry,
+										 bool *skipped_write);
 extern void pa_detach_all_error_mq(void);
 
+extern void pa_advance_committable_seq_num(void);
+extern void pa_wait_until_committable(void);
+extern bool pa_cur_transaction_committable(void);
+
 extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
 						 const void *data);
+extern void pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel);
 extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
 										   bool stream_locked);
 
@@ -325,6 +348,7 @@ extern void pa_decr_and_wait_stream_block(void);
 
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
+extern void pa_update_commit_seq(ParallelApplyWorkerInfo *winfo);
 
 #define isParallelApplyWorker(worker) ((worker)->in_use && \
 									   (worker)->type == WORKERTYPE_PARALLEL_APPLY)
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
index 3d16c2a800d..c2fba0b9a9c 100644
--- a/src/test/subscription/t/010_truncate.pl
+++ b/src/test/subscription/t/010_truncate.pl
@@ -17,7 +17,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_logical_replication_workers = 6));
+	qq(max_logical_replication_workers = 7));
 $node_subscriber->start;
 
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index 00a1c2fcd48..6842476c8b0 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,7 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10");
 $node_subscriber->start;
 
 
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index 36af1c16e7f..aec039d565b 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -87,6 +87,7 @@ $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_publisher->init(allows_streaming => 'logical');
 $node_subscriber->init;
 $node_publisher->start;
+$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10");
 $node_subscriber->start;
 $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 my %remainder_a = (
-- 
2.49.0.windows.1

