Hi Tomas,

Interesting. Any idea where does the extra overhead in this particular
case come from? It's hard to deduce that from the single flame graph,
when I don't have anything to compare it with (i.e. the flame graph for
the "normal" case).
I guess that bottleneck is in disk operations. You can check
logical_repl_worker_new_perf.svg flame graph: disk reads (~9%) and
writes (~26%) take around 35% of CPU time in summary. To compare,
please, see attached flame graph for the following transaction:

INSERT INTO large_text
SELECT (SELECT string_agg('x', ',')
FROM generate_series(1, 2000)) FROM generate_series(1, 1000000);

Execution Time: 44519.816 ms
Time: 98333,642 ms (01:38,334)

where disk IO is only ~7-8% in total. So we get very roughly the same
~x4-5 performance drop here. JFYI, I am using a machine with SSD for tests.

Therefore, probably you may write changes on receiver in bigger chunks,
not each change separately.

Possibly, I/O is certainly a possible culprit, although we should be
using buffered I/O and there certainly are not any fsyncs here. So I'm
not sure why would it be cheaper to do the writes in batches.

BTW does this mean you see the overhead on the apply side? Or are you
running this on a single machine, and it's difficult to decide?

I run this on a single machine, but walsender and worker are utilizing almost 100% of CPU per each process all the time, and at apply side I/O syscalls take about 1/3 of CPU time. Though I am still not sure, but for me this result somehow links performance drop with problems at receiver side.

Writing in batches was just a hypothesis and to validate it I have performed test with large txn, but consisting of a smaller number of wide rows. This test does not exhibit any significant performance drop, while it was streamed too. So it seems to be valid. Anyway, I do not have other reasonable ideas beside that right now.

I've checked recently this patch again and tried to elaborate it in terms of performance. As a result I've implemented a new POC version of the applier (attached). Almost everything in streaming logic stayed intact, but apply worker is significantly different.

As I wrote earlier I still claim, that spilling changes on disk at the applier side adds additional overhead, but it is possible to get rid of it. In my additional patch I do the following:

1) Maintain a pool of additional background workers (bgworkers), that are connected with main logical apply worker via shm_mq's. Each worker is dedicated to the processing of specific streamed transaction.

2) When we receive a streamed change for some transaction, we check whether there is an existing dedicated bgworker in HTAB (xid -> bgworker), or there are some in the idle list, or spawn a new one.

3) We pass all changes (between STREAM START/STOP) to that bgworker via shm_mq_send without intermediate waiting. However, we wait for bgworker to apply the entire changes chunk at STREAM STOP, since we don't want transactions reordering.

4) When transaction is commited/aborted worker is being added to the idle list and is waiting for reassigning message.

5) I have used the same machinery with apply_dispatch in bgworkers, since most of actions are practically very similar.

Thus, we do not spill anything at the applier side, so transaction changes are processed by bgworkers as normal backends do. In the same time, changes processing is strictly serial, which prevents transactions reordering and possible conflicts/anomalies. Even though we trade off performance in favor of stability the result is rather impressive. I have used a similar query for testing as before:

EXPLAIN (ANALYZE, BUFFERS) INSERT INTO large_test (num1, num2, num3)
    SELECT round(random()*10), random(), random()*142
    FROM generate_series(1, 1000000) s(i);

with 1kk (1000000), 3kk and 5kk rows; logical_work_mem = 64MB and synchronous_standby_names = 'FIRST 1 (large_sub)'. Table schema is following:

CREATE TABLE large_test (
    id serial primary key,
    num1 bigint,
    num2 double precision,
    num3 double precision
);

Here are the results:

-------------------------------------------------------------------
| N | Time on master, sec | Total xact time, sec |     Ratio      |
-------------------------------------------------------------------
|                        On commit (master, v13)                  |
-------------------------------------------------------------------
| 1kk | 6.5               | 17.6                 | x2.74          |
-------------------------------------------------------------------
| 3kk | 21                | 55.4                 | x2.64          |
-------------------------------------------------------------------
| 5kk | 38.3              | 91.5                 | x2.39          |
-------------------------------------------------------------------
|                        Stream + spill                           |
-------------------------------------------------------------------
| 1kk | 5.9               | 18                   | x3             |
-------------------------------------------------------------------
| 3kk | 19.5              | 52.4                 | x2.7           |
-------------------------------------------------------------------
| 5kk | 33.3              | 86.7                 | x2.86          |
-------------------------------------------------------------------
|                        Stream + BGW pool                        |
-------------------------------------------------------------------
| 1kk | 6                 | 12                   | x2             |
-------------------------------------------------------------------
| 3kk | 18.5              | 30.5                 | x1.65          |
-------------------------------------------------------------------
| 5kk | 35.6              | 53.9                 | x1.51          |
-------------------------------------------------------------------

It seems that overhead added by synchronous replica is lower by 2-3 times compared with Postgres master and streaming with spilling. Therefore, the original patch eliminated delay before large transaction processing start by sender, while this additional patch speeds up the applier side.

Although the overall speed up is surely measurable, there is a room for improvements yet:

1) Currently bgworkers are only spawned on demand without some initial pool and never stopped. Maybe we should create a small pool on replication start and offload some of idle bgworkers if they exceed some limit?

2) Probably we can track somehow that incoming change has conflicts with some of being processed xacts, so we can wait for specific bgworkers only in that case?

3) Since the communication between main logical apply worker and each bgworker from the pool is a 'single producer --- single consumer' problem, then probably it is possible to wait and set/check flags without locks, but using just atomics.

What do you think about this concept in general? Any concerns and criticism are welcome!


Regards

--
Alexey Kondratov

Postgres Professional https://www.postgrespro.com
Russian Postgres Company

P.S. This patch shloud be applicable to your last patch set. I would rebase it 
against master, but it depends on 2pc patch, that I don't know well enough.

>From 11c7549d2732f2f983d4548a81cd509dd7e41ec4 Mon Sep 17 00:00:00 2001
From: Alexey Kondratov <kondratov.alek...@gmail.com>
Date: Wed, 28 Aug 2019 15:26:50 +0300
Subject: [PATCH 11/11] BGWorkers pool for streamed transactions apply without
 spilling on disk

---
 src/backend/postmaster/bgworker.c        |    3 +
 src/backend/postmaster/pgstat.c          |    3 +
 src/backend/replication/logical/proto.c  |   17 +-
 src/backend/replication/logical/worker.c | 1780 +++++++++++-----------
 src/include/pgstat.h                     |    1 +
 src/include/replication/logicalproto.h   |    4 +-
 src/include/replication/logicalworker.h  |    1 +
 7 files changed, 933 insertions(+), 876 deletions(-)

diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index f5db5a8c4a..6860df07ca 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -129,6 +129,9 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"LogicalApplyBgwMain", LogicalApplyBgwMain
 	}
 };
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e5a4d147a7..b32994784f 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3637,6 +3637,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING:
 			event_name = "Hash/GrowBuckets/Reinserting";
 			break;
+		case WAIT_EVENT_LOGICAL_APPLY_WORKER_READY:
+			event_name = "LogicalApplyWorkerReady";
+			break;
 		case WAIT_EVENT_LOGICAL_SYNC_DATA:
 			event_name = "LogicalSyncData";
 			break;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 4bec9fe8b5..954ce7343a 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -789,14 +789,11 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
 	pq_sendint64(out, txn->commit_time);
 }
 
-TransactionId
+void
 logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
 {
-	TransactionId	xid;
 	uint8			flags;
 
-	xid = pq_getmsgint(in, 4);
-
 	/* read flags (unused for now) */
 	flags = pq_getmsgbyte(in);
 
@@ -807,8 +804,6 @@ logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
 	commit_data->commit_lsn = pq_getmsgint64(in);
 	commit_data->end_lsn = pq_getmsgint64(in);
 	commit_data->committime = pq_getmsgint64(in);
-
-	return xid;
 }
 
 void
@@ -823,13 +818,3 @@ logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
 	pq_sendint32(out, xid);
 	pq_sendint32(out, subxid);
 }
-
-void
-logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
-							 TransactionId *subxid)
-{
-	Assert(xid && subxid);
-
-	*xid = pq_getmsgint(in, 4);
-	*subxid = pq_getmsgint(in, 4);
-}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ca632b7dc4..dc6c895fca 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -92,11 +92,16 @@
 #include "rewrite/rewriteHandler.h"
 
 #include "storage/bufmgr.h"
+// #include "storage/condition_variable.h"
+#include "storage/dsm.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "storage/spin.h"
 
 #include "tcop/tcopprot.h"
 
@@ -115,6 +120,54 @@
 #include "utils/syscache.h"
 
 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
+#define PG_LOGICAL_APPLY_SHM_MAGIC 0x79fb2447 // TODO Consider change
+
+typedef struct ParallelState
+{
+	slock_t	mutex;
+	// ConditionVariable cv;
+	bool	attached;
+	bool	ready;
+	bool	finished;
+	Oid		database_id;
+	Oid		authenticated_user_id;
+	Oid		subid;
+	Oid		stream_xid;
+	uint32	n;
+} ParallelState;
+
+typedef struct WorkerState
+{
+	TransactionId			 xid;
+	BackgroundWorkerHandle	*handle;
+	shm_mq_handle			*mq_handle;
+	dsm_segment				*dsm_seg;
+	ParallelState volatile	*pstate;
+} WorkerState;
+
+/* Apply workers hash table (initialized on first use) */
+static HTAB *ApplyWorkersHash = NULL;
+static WorkerState **ApplyWorkersIdleList = NULL;
+static uint32 pool_size = 10; /* MaxConnections default? */
+static uint32 nworkers = 0;
+static uint32 nfreeworkers = 0;
+
+/* Fields valid only for apply background workers */
+bool isLogicalApplyWorker = false;
+volatile ParallelState *MyParallelState = NULL;
+
+/* Worker setup and interactions */
+static void setup_dsm(WorkerState *wstate);
+static void setup_background_worker(WorkerState *wstate);
+static void cleanup_background_worker(dsm_segment *seg, Datum arg);
+static void handle_sigterm(SIGNAL_ARGS);
+
+static bool check_worker_status(WorkerState *wstate);
+static void wait_for_worker(WorkerState *wstate);
+static void wait_for_worker_to_finish(WorkerState *wstate);
+
+static WorkerState * find_or_start_worker(TransactionId xid, bool start);
+static void stop_worker(WorkerState *wstate);
 
 typedef struct FlushPosition
 {
@@ -143,47 +196,13 @@ bool		MySubscriptionValid = false;
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
-/* fields valid only when processing streamed transaction */
+/* Fields valid only when processing streamed transaction */
 bool		in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
-static int	stream_fd = -1;
-
-typedef struct SubXactInfo
-{
-	TransactionId xid;			/* XID of the subxact */
-	off_t		offset;			/* offset in the file */
-}			SubXactInfo;
-
-static uint32 nsubxacts = 0;
-static uint32 nsubxacts_max = 0;
-static SubXactInfo * subxacts = NULL;
-static TransactionId subxact_last = InvalidTransactionId;
-
-static void subxact_filename(char *path, Oid subid, TransactionId xid);
-static void changes_filename(char *path, Oid subid, TransactionId xid);
-
-/*
- * Information about subtransactions of a given toplevel transaction.
- */
-static void subxact_info_write(Oid subid, TransactionId xid);
-static void subxact_info_read(Oid subid, TransactionId xid);
-static void subxact_info_add(TransactionId xid);
-
-/*
- * Serialize and deserialize changes for a toplevel transaction.
- */
-static void stream_cleanup_files(Oid subid, TransactionId xid);
-static void stream_open_file(Oid subid, TransactionId xid, bool first);
-static void stream_write_change(char action, StringInfo s);
-static void stream_close_file(void);
-
-/*
- * Array of serialized XIDs.
- */
-static int	nxids = 0;
-static int	maxnxids = 0;
-static TransactionId	*xids = NULL;
+static TransactionId current_xid = InvalidTransactionId;
+static TransactionId prev_xid = InvalidTransactionId;
+static uint32 nchanges = 0;
 
 static bool handle_streamed_transaction(const char action, StringInfo s);
 
@@ -199,6 +218,16 @@ static volatile sig_atomic_t got_SIGHUP = false;
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
+// /* Debug only */
+// static void
+// iter_sleep(int seconds)
+// {
+// 	for (int i = 0; i < seconds; i++)
+// 	{
+// 		pg_usleep(1 * 1000L * 1000L);
+// 	}
+// }
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -250,6 +279,107 @@ ensure_transaction(void)
 	return true;
 }
 
+/*
+ * Look up worker inside ApplyWorkersHash for requested xid.
+ * Throw error if not found or start a new one if start=true is passed.
+ */
+static WorkerState *
+find_or_start_worker(TransactionId xid, bool start)
+{
+	bool found;
+	WorkerState *entry = NULL;
+
+	Assert(TransactionIdIsValid(xid));
+
+	/* First time through, initialize apply workers hashtable */
+	if (ApplyWorkersHash == NULL)
+	{
+		HASHCTL		ctl;
+
+		MemSet(&ctl, 0, sizeof(ctl));
+		ctl.keysize = sizeof(TransactionId);
+		ctl.entrysize = sizeof(WorkerState);
+		ctl.hcxt = ApplyContext; /* Allocate ApplyWorkersHash in the ApplyContext */
+		ApplyWorkersHash = hash_create("logical apply workers hash", 8,
+									 &ctl,
+									 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	}
+
+	Assert(ApplyWorkersHash != NULL);
+
+	/*
+	 * Find entry for requested transaction.
+	 */
+	entry = hash_search(ApplyWorkersHash, &xid, HASH_FIND, &found);
+
+	if (!found && start)
+	{
+		/* If there is at least one worker in the idle list, then take one. */
+		if (nfreeworkers > 0)
+		{
+			char action = 'R';
+
+			Assert(ApplyWorkersIdleList != NULL);
+
+			entry = ApplyWorkersIdleList[nfreeworkers - 1];
+			if (!hash_update_hash_key(ApplyWorkersHash,
+									  (void *) entry,
+									  (void *) &xid))
+				elog(ERROR, "could not reassign apply worker #%u entry from xid %u to xid %u",
+													entry->pstate->n, entry->xid, xid);
+
+			entry->xid = xid;
+			entry->pstate->finished = false;
+			entry->pstate->stream_xid = xid;
+			shm_mq_send(entry->mq_handle, 1, &action, false);
+
+			ApplyWorkersIdleList[--nfreeworkers] = NULL;
+		}
+		else
+		{
+			/* No entry in hash and no idle workers. Create a new one. */
+			entry = hash_search(ApplyWorkersHash, &xid, HASH_ENTER, &found);
+			entry->xid = xid;
+			setup_background_worker(entry);
+
+			if (nworkers == pool_size)
+			{
+				ApplyWorkersIdleList = repalloc(ApplyWorkersIdleList, pool_size + 10);
+				pool_size += 10;
+			}
+		}
+	}
+	else if (!found && !start)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				errmsg("could not find logical apply worker for xid %u", xid)));
+	else
+		elog(DEBUG5, "there is an existing logical apply worker for xid %u", xid);
+
+	Assert(entry != NULL);
+
+	return entry;
+}
+
+/*
+ * Gracefully teardown apply worker.
+ */
+static void
+stop_worker(WorkerState *wstate)
+{
+	/*
+	 * Sending zero-length data to worker in order to stop it.
+	 */
+	shm_mq_send(wstate->mq_handle, 0, NULL, false);
+
+	elog(LOG, "detaching DSM of apply worker #%u for xid %u",
+									wstate->pstate->n, wstate->xid);
+	dsm_detach(wstate->dsm_seg);
+
+	/* Delete worker entry */
+	(void) hash_search(ApplyWorkersHash, &wstate->xid, HASH_REMOVE, NULL);
+}
+
 /*
  * Handle streamed transactions.
  *
@@ -262,12 +392,12 @@ static bool
 handle_streamed_transaction(const char action, StringInfo s)
 {
 	TransactionId xid;
+	WorkerState *entry;
 
 	/* not in streaming mode */
-	if (!in_streamed_transaction)
+	if (!in_streamed_transaction || isLogicalApplyWorker)
 		return false;
 
-	Assert(stream_fd != -1);
 	Assert(TransactionIdIsValid(stream_xid));
 
 	/*
@@ -278,11 +408,16 @@ handle_streamed_transaction(const char action, StringInfo s)
 
 	Assert(TransactionIdIsValid(xid));
 
-	/* Add the new subxact to the array (unless already there). */
-	subxact_info_add(xid);
+	/*
+	 * Find worker for requested xid.
+	 */
+	entry = find_or_start_worker(stream_xid, false);
 
-	/* write the change to the current file */
-	stream_write_change(action, s);
+	// elog(LOG, "sending message of length=%d and raw=%s, action=%s", s->len, s->data, (char *) &action);
+	shm_mq_send(entry->mq_handle, s->len, s->data, false);
+	nchanges += 1;
+
+	// iter_sleep(3600);
 
 	return true;
 }
@@ -643,7 +778,8 @@ apply_handle_origin(StringInfo s)
 static void
 apply_handle_stream_start(StringInfo s)
 {
-	bool		first_segment;
+	bool		 first_segment;
+	WorkerState *entry;
 
 	Assert(!in_streamed_transaction);
 
@@ -652,17 +788,16 @@ apply_handle_stream_start(StringInfo s)
 
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
+	nchanges = 0;
 
-	/* open the spool file for this transaction */
-	stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+	/* Find worker for requested xid */
+	entry = find_or_start_worker(stream_xid, true);
 
-	/*
-	 * if this is not the first segment, open existing file
-	 *
-	 * XXX Note that the cleanup is performed by stream_open_file.
-	 */
-	if (!first_segment)
-		subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
+	SpinLockAcquire(&entry->pstate->mutex);
+	entry->pstate->ready = false;
+	SpinLockRelease(&entry->pstate->mutex);
+
+	elog(LOG, "starting streaming of xid %u", stream_xid);
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
 }
@@ -673,16 +808,19 @@ apply_handle_stream_start(StringInfo s)
 static void
 apply_handle_stream_stop(StringInfo s)
 {
+	WorkerState *entry;
+	char action = 'E';
+
 	Assert(in_streamed_transaction);
 
-	/*
-	 * Close the file with serialized changes, and serialize information about
-	 * subxacts for the toplevel transaction.
-	 */
-	subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
-	stream_close_file();
+	/* Find worker for requested xid */
+	entry = find_or_start_worker(stream_xid, false);
+
+	shm_mq_send(entry->mq_handle, 1, &action, false);
+	wait_for_worker(entry);
 
 	in_streamed_transaction = false;
+	elog(LOG, "stopped streaming of xid %u, %u changes streamed", stream_xid, nchanges);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
 }
@@ -695,96 +833,67 @@ apply_handle_stream_abort(StringInfo s)
 {
 	TransactionId xid;
 	TransactionId subxid;
+	WorkerState *entry;
 
 	Assert(!in_streamed_transaction);
 
-	logicalrep_read_stream_abort(s, &xid, &subxid);
-
-	/*
-	 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
-	 * just delete the files with serialized info.
-	 */
-	if (xid == subxid)
+	if(isLogicalApplyWorker)
 	{
-		char		path[MAXPGPATH];
+		subxid = pq_getmsgint(s, 4);
 
-		/*
-		 * XXX Maybe this should be an error instead? Can we receive abort for
-		 * a toplevel transaction we haven't received?
-		 */
+		ereport(LOG,
+				(errcode_for_file_access(),
+				errmsg("[Apply BGW #%u] aborting current transaction xid=%u, subxid=%u",
+				MyParallelState->n, GetCurrentTransactionIdIfAny(), GetCurrentSubTransactionId())));
 
-		changes_filename(path, MyLogicalRepWorker->subid, xid);
+		if (subxid == stream_xid)
+			AbortCurrentTransaction();
+		else
+		{
+			char *spname = (char *) palloc(64 * sizeof(char));
+			sprintf(spname, "savepoint_for_xid_%u", subxid);
 
-		if (unlink(path) < 0)
-			ereport(ERROR,
+			ereport(LOG,
 					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m", path)));
+					errmsg("[Apply BGW #%u] rolling back to savepoint %s", MyParallelState->n, spname)));
 
-		subxact_filename(path, MyLogicalRepWorker->subid, xid);
-
-		if (unlink(path) < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m", path)));
+			RollbackToSavepoint(spname);
+			CommitTransactionCommand();
+			// RollbackAndReleaseCurrentSubTransaction();
 
-		return;
+			pfree(spname);
+		}
 	}
 	else
 	{
-		/*
-		 * OK, so it's a subxact. We need to read the subxact file for the
-		 * toplevel transaction, determine the offset tracked for the subxact,
-		 * and truncate the file with changes. We also remove the subxacts
-		 * with higher offsets (or rather higher XIDs).
-		 *
-		 * We intentionally scan the array from the tail, because we're likely
-		 * aborting a change for the most recent subtransactions.
-		 *
-		 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
-		 * would allow us to use binary search here.
-		 *
-		 * XXX Or perhaps we can rely on the aborts to arrive in the reverse
-		 * order, i.e. from the inner-most subxact (when nested)? In which
-		 * case we could simply check the last element.
-		 */
+		xid = pq_getmsgint(s, 4);
+		subxid = pq_getmsgint(s, 4);
 
-		int64		i;
-		int64		subidx;
-		bool		found = false;
-		char		path[MAXPGPATH];
+		/* Find worker for requested xid */
+		entry = find_or_start_worker(stream_xid, false);
 
-		subidx = -1;
-		subxact_info_read(MyLogicalRepWorker->subid, xid);
+		elog(LOG, "processing abort request of streamed transaction xid %u, subxid %u",
+			xid, subxid);
+		shm_mq_send(entry->mq_handle, s->len, s->data, false);
 
-		/* FIXME optimize the search by bsearch on sorted data */
-		for (i = nsubxacts; i > 0; i--)
+		if (subxid == stream_xid)
 		{
-			if (subxacts[i - 1].xid == subxid)
-			{
-				subidx = (i - 1);
-				found = true;
-				break;
-			}
-		}
-
-		/* We should not receive aborts for unknown subtransactions. */
-		Assert(found);
+			char action = 'F';
+			shm_mq_send(entry->mq_handle, 1, &action, false);
+			// shm_mq_send(entry->mq_handle, 0, NULL, false);
 
-		/* OK, truncate the file at the right offset. */
-		Assert((subidx >= 0) && (subidx < nsubxacts));
+			wait_for_worker_to_finish(entry);
 
-		changes_filename(path, MyLogicalRepWorker->subid, xid);
+			elog(LOG, "adding finished apply worker #%u for xid %u to the idle list",
+												entry->pstate->n, entry->xid);
+			ApplyWorkersIdleList[nfreeworkers++] = entry;
 
-		if (truncate(path, subxacts[subidx].offset))
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not truncate file \"%s\": %m", path)));
+			// elog(LOG, "detaching DSM of apply worker for xid=%u\n", entry->xid);
+			// dsm_detach(entry->dsm_seg);
 
-		/* discard the subxacts added later */
-		nsubxacts = subidx;
-
-		/* write the updated subxact list */
-		subxact_info_write(MyLogicalRepWorker->subid, xid);
+			// /* Delete worker entry */
+			// (void) hash_search(ApplyWorkersHash, &xid, HASH_REMOVE, NULL);
+		}
 	}
 }
 
@@ -794,159 +903,56 @@ apply_handle_stream_abort(StringInfo s)
 static void
 apply_handle_stream_commit(StringInfo s)
 {
-	int			fd;
 	TransactionId xid;
-	StringInfoData s2;
-	int			nchanges;
-
-	char		path[MAXPGPATH];
-	char	   *buffer = NULL;
+	WorkerState *entry;
 	LogicalRepCommitData commit_data;
 
-	MemoryContext oldcxt;
-
-	Assert(!in_streamed_transaction);
-
-	xid = logicalrep_read_stream_commit(s, &commit_data);
-
-	elog(DEBUG1, "received commit for streamed transaction %u", xid);
-
-	/* open the spool file for the committed transaction */
-	changes_filename(path, MyLogicalRepWorker->subid, xid);
-
-	elog(DEBUG1, "replaying changes from file '%s'", path);
-
-	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
-	if (fd < 0)
+	if (isLogicalApplyWorker)
 	{
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not open file \"%s\": %m",
-						path)));
-	}
-
-	/* XXX Should this be allocated in another memory context? */
+		// logicalrep_read_stream_commit(s, &commit_data);
 
-	oldcxt = MemoryContextSwitchTo(TopMemoryContext);
-
-	buffer = palloc(8192);
-	initStringInfo(&s2);
-
-	MemoryContextSwitchTo(oldcxt);
-
-	ensure_transaction();
-
-	/*
-	 * Make sure the handle apply_dispatch methods are aware we're in a remote
-	 * transaction.
-	 */
-	in_remote_transaction = true;
-	pgstat_report_activity(STATE_RUNNING, NULL);
-
-	/*
-	 * Read the entries one by one and pass them through the same logic as in
-	 * apply_dispatch.
-	 */
-	nchanges = 0;
-	while (true)
+		CommitTransactionCommand();
+	}
+	else
 	{
-		int			nbytes;
-		int			len;
-
-		/* read length of the on-disk record */
-		pgstat_report_wait_start(WAIT_EVENT_LOGICAL_CHANGES_READ);
-		nbytes = read(fd, &len, sizeof(len));
-		pgstat_report_wait_end();
-
-		/* have we reached end of the file? */
-		if (nbytes == 0)
-			break;
-
-		/* do we have a correct length? */
-		if (nbytes != sizeof(len))
-		{
-			int			save_errno = errno;
-
-			CloseTransientFile(fd);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file: %m")));
-			return;
-		}
-
-		Assert(len > 0);
+		char action = 'F';
 
-		/* make sure we have sufficiently large buffer */
-		buffer = repalloc(buffer, len);
-
-		/* and finally read the data into the buffer */
-		pgstat_report_wait_start(WAIT_EVENT_LOGICAL_CHANGES_READ);
-		if (read(fd, buffer, len) != len)
-		{
-			int			save_errno = errno;
-
-			CloseTransientFile(fd);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file: %m")));
-			return;
-		}
-		pgstat_report_wait_end();
+		Assert(!in_streamed_transaction);
 
-		/* copy the buffer to the stringinfo and call apply_dispatch */
-		resetStringInfo(&s2);
-		appendBinaryStringInfo(&s2, buffer, len);
+		xid = pq_getmsgint(s, 4);
+		logicalrep_read_stream_commit(s, &commit_data);
 
-		/* Ensure we are reading the data into our memory context. */
-		oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
+		elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
-		apply_dispatch(&s2);
+		/* Find worker for requested xid */
+		entry = find_or_start_worker(xid, false);
 
-		MemoryContextReset(ApplyMessageContext);
+		/* Send commit message */
+		shm_mq_send(entry->mq_handle, s->len, s->data, false);
 
-		MemoryContextSwitchTo(oldcxt);
+		/* Notify worker, that we are done with this xact */
+		shm_mq_send(entry->mq_handle, 1, &action, false);
 
-		nchanges++;
+		wait_for_worker_to_finish(entry);
 
-		if (nchanges % 1000 == 0)
-			elog(DEBUG1, "replayed %d changes from file '%s'",
-				 nchanges, path);
+		elog(LOG, "adding finished apply worker #%u for xid %u to the idle list",
+											entry->pstate->n, entry->xid);
+		ApplyWorkersIdleList[nfreeworkers++] = entry;
 
 		/*
-		 * send feedback to upstream
-		 *
-		 * XXX Probably should send a valid LSN. But which one?
+		 * Update origin state so we can restart streaming from correct
+		 * position in case of crash.
 		 */
-		send_feedback(InvalidXLogRecPtr, false, false);
-	}
-
-	CloseTransientFile(fd);
-
-	/*
-	 * Update origin state so we can restart streaming from correct
-	 * position in case of crash.
-	 */
-	replorigin_session_origin_lsn = commit_data.end_lsn;
-	replorigin_session_origin_timestamp = commit_data.committime;
-
-	CommitTransactionCommand();
-	pgstat_report_stat(false);
-
-	store_flush_position(commit_data.end_lsn);
-
-	elog(DEBUG1, "replayed %d (all) changes from file '%s'",
-		 nchanges, path);
+		replorigin_session_origin_lsn = commit_data.end_lsn;
+		replorigin_session_origin_timestamp = commit_data.committime;
 
-	in_remote_transaction = false;
-	pgstat_report_activity(STATE_IDLE, NULL);
+		pgstat_report_stat(false);
 
-	/* unlink the files with serialized changes and subxact info */
-	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+		store_flush_position(commit_data.end_lsn);
 
-	pfree(buffer);
-	pfree(s2.data);
+		in_remote_transaction = false;
+		pgstat_report_activity(STATE_IDLE, NULL);
+	}
 }
 
 /*
@@ -965,6 +971,8 @@ apply_handle_relation(StringInfo s)
 	if (handle_streamed_transaction('R', s))
 		return;
 
+	// iter_sleep(3600);
+
 	rel = logicalrep_read_rel(s);
 	logicalrep_relmap_update(rel);
 }
@@ -1407,6 +1415,35 @@ apply_dispatch(StringInfo s)
 {
 	char		action = pq_getmsgbyte(s);
 
+	if (isLogicalApplyWorker)
+	{
+		/*
+		 * Inside logical apply worker we can figure out that new subtransaction
+		 * was started if new change arrived with different xid. In that case we
+		 * can define named savepoint, so that we were able to commit/rollback it
+		 * separately later.
+		 */
+		current_xid = pq_getmsgint(s, 4);
+
+		if (prev_xid == InvalidTransactionId)
+			prev_xid = current_xid;
+		else if (current_xid != prev_xid && current_xid != stream_xid)
+		{
+			char *spname = (char *) palloc(64 * sizeof(char));
+			sprintf(spname, "savepoint_for_xid_%u", current_xid);
+
+			elog(LOG, "[Apply BGW #%u] defining savepoint %s", MyParallelState->n, spname);
+
+			DefineSavepoint(spname);
+			CommitTransactionCommand();
+			// BeginInternalSubTransaction(NULL);
+		}
+
+		prev_xid = current_xid;
+	}
+	// else
+	// 	elog(LOG, "Logical worker: applying dispatch for action=%s", (char *) &action);
+
 	switch (action)
 	{
 			/* BEGIN */
@@ -1435,6 +1472,7 @@ apply_dispatch(StringInfo s)
 			break;
 			/* RELATION */
 		case 'R':
+			// elog(LOG, "%s worker: applying dispatch for action=R", isLogicalApplyWorker ? "Apply" : "Logical");
 			apply_handle_relation(s);
 			break;
 			/* TYPE */
@@ -1565,12 +1603,18 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 worker_onexit(int code, Datum arg)
 {
-	int	i;
+	HASH_SEQ_STATUS status;
+	WorkerState *entry;
 
-	elog(LOG, "cleanup files for %d transactions", nxids);
-
-	for (i = nxids-1; i >= 0; i--)
-		stream_cleanup_files(MyLogicalRepWorker->subid, xids[i]);
+	if (ApplyWorkersHash != NULL)
+	{
+		hash_seq_init(&status, ApplyWorkersHash);
+		while ((entry = (WorkerState *) hash_seq_search(&status)) != NULL)
+		{
+			stop_worker(entry);
+		}
+		hash_seq_term(&status);
+	}
 }
 
 /*
@@ -1593,6 +1637,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
 
+	ApplyWorkersIdleList = palloc(sizeof(WorkerState *) * pool_size);
+
 	for (;;)
 	{
 		pgsocket	fd = PGINVALID_SOCKET;
@@ -1904,8 +1950,9 @@ maybe_reread_subscription(void)
 	Subscription *newsub;
 	bool		started_tx = false;
 
+	// TODO Probably we have to handle subscription reread in apply workers too.
 	/* When cache state is valid there is nothing to do here. */
-	if (MySubscriptionValid)
+	if (MySubscriptionValid || isLogicalApplyWorker)
 		return;
 
 	/* This function might be called inside or outside of transaction. */
@@ -2039,608 +2086,50 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
 	MySubscriptionValid = false;
 }
 
-/*
- * subxact_info_write
- *	  Store information about subxacts for a toplevel transaction.
- *
- * For each subxact we store offset of it's first change in the main file.
- * The file is always over-written as a whole, and we also include CRC32C
- * checksum of the information.
- *
- * XXX We should only store subxacts that were not aborted yet.
- *
- * XXX Maybe we should only include the checksum when the cluster is
- * initialized with checksums?
- *
- * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end.
- */
+/* SIGHUP: set flag to reload configuration at next convenient time */
 static void
-subxact_info_write(Oid subid, TransactionId xid)
+logicalrep_worker_sighup(SIGNAL_ARGS)
 {
-	int			fd;
-	char		path[MAXPGPATH];
-	uint32		checksum;
-	Size		len;
-
-	Assert(TransactionIdIsValid(xid));
-
-	subxact_filename(path, subid, xid);
-
-	fd = OpenTransientFile(path, O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
-	if (fd < 0)
-	{
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not create file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	len = sizeof(SubXactInfo) * nsubxacts;
-
-	/* compute the checksum */
-	INIT_CRC32C(checksum);
-	COMP_CRC32C(checksum, (char *) &nsubxacts, sizeof(nsubxacts));
-	COMP_CRC32C(checksum, (char *) subxacts, len);
-	FIN_CRC32C(checksum);
-
-	pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_WRITE);
-
-	if (write(fd, &checksum, sizeof(checksum)) != sizeof(checksum))
-	{
-		int			save_errno = errno;
+	int			save_errno = errno;
 
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not write to file \"%s\": %m",
-						path)));
-		return;
-	}
+	got_SIGHUP = true;
 
-	if (write(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
-	{
-		int			save_errno = errno;
+	/* Waken anything waiting on the process latch */
+	SetLatch(MyLatch);
 
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not write to file \"%s\": %m",
-						path)));
-		return;
-	}
+	errno = save_errno;
+}
 
-	if ((len > 0) && (write(fd, subxacts, len) != len))
-	{
-		int			save_errno = errno;
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)
+{
+	int			worker_slot = DatumGetInt32(main_arg);
+	MemoryContext oldctx;
+	char		originname[NAMEDATALEN];
+	XLogRecPtr	origin_startpos;
+	char	   *myslotname;
+	WalRcvStreamOptions options;
 
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not write to file \"%s\": %m",
-						path)));
-		return;
-	}
+	/* Attach to slot */
+	logicalrep_worker_attach(worker_slot);
 
-	pgstat_report_wait_end();
+	/* Setup signal handling */
+	pqsignal(SIGHUP, logicalrep_worker_sighup);
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
 
 	/*
-	 * We don't need to fsync or anything, as we'll recreate the files after a
-	 * crash from scratch. So just close the file.
+	 * We don't currently need any ResourceOwner in a walreceiver process, but
+	 * if we did, we could call CreateAuxProcessResourceOwner here.
 	 */
-	CloseTransientFile(fd);
 
-	/*
-	 * But we free the memory allocated for subxact info. There might be one
-	 * exceptional transaction with many subxacts, and we don't want to keep
-	 * the memory allocated forewer.
-	 *
-	 */
-	if (subxacts)
-		pfree(subxacts);
+	/* Initialise stats to a sanish value */
+	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
+		MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
 
-	subxacts = NULL;
-	subxact_last = InvalidTransactionId;
-	nsubxacts = 0;
-	nsubxacts_max = 0;
-}
-
-/*
- * subxact_info_read
- *	  Restore information about subxacts of a streamed transaction.
- *
- * Read information about subxacts into the global variables, and while
- * reading the information verify the checksum.
- *
- * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end.
- *
- * XXX Do we need to allocate it in TopMemoryContext?
- */
-static void
-subxact_info_read(Oid subid, TransactionId xid)
-{
-	int			fd;
-	char		path[MAXPGPATH];
-	uint32		checksum;
-	uint32		checksum_new;
-	Size		len;
-	MemoryContext oldctx;
-
-	Assert(TransactionIdIsValid(xid));
-	Assert(!subxacts);
-	Assert(nsubxacts == 0);
-	Assert(nsubxacts_max == 0);
-
-	subxact_filename(path, subid, xid);
-
-	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
-	if (fd < 0)
-	{
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not open file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_READ);
-
-	/* read the checksum */
-	if (read(fd, &checksum, sizeof(checksum)) != sizeof(checksum))
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not read file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	/* read number of subxact items */
-	if (read(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not read file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	pgstat_report_wait_end();
-
-	len = sizeof(SubXactInfo) * nsubxacts;
-
-	/* we keep the maximum as a power of 2 */
-	nsubxacts_max = 1 << my_log2(nsubxacts);
-
-	/* subxacts are long-lived */
-	oldctx = MemoryContextSwitchTo(TopMemoryContext);
-	subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo));
-	MemoryContextSwitchTo(oldctx);
-
-	pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_READ);
-
-	if ((len > 0) && ((read(fd, subxacts, len)) != len))
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not read file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	pgstat_report_wait_end();
-
-	/* recompute the checksum */
-	INIT_CRC32C(checksum_new);
-	COMP_CRC32C(checksum_new, (char *) &nsubxacts, sizeof(nsubxacts));
-	COMP_CRC32C(checksum_new, (char *) subxacts, len);
-	FIN_CRC32C(checksum_new);
-
-	if (checksum_new != checksum)
-		ereport(ERROR,
-				(errmsg("checksum failure when reading subxacts")));
-
-	CloseTransientFile(fd);
-}
-
-/*
- * subxact_info_add
- *	  Add information about a subxact (offset in the main file).
- *
- * XXX Do we need to allocate it in TopMemoryContext?
- */
-static void
-subxact_info_add(TransactionId xid)
-{
-	int64		i;
-
-	/*
-	 * If the XID matches the toplevel transaction, we don't want to add it.
-	 */
-	if (stream_xid == xid)
-		return;
-
-	/*
-	 * In most cases we're checking the same subxact as we've already seen in
-	 * the last call, so make ure just ignore it (this change comes later).
-	 */
-	if (subxact_last == xid)
-		return;
-
-	/* OK, remember we're processing this XID. */
-	subxact_last = xid;
-
-	/*
-	 * Check if the transaction is already present in the array of subxact. We
-	 * intentionally scan the array from the tail, because we're likely adding
-	 * a change for the most recent subtransactions.
-	 *
-	 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
-	 * would allow us to use binary search here.
-	 */
-	for (i = nsubxacts; i > 0; i--)
-	{
-		/* found, so we're done */
-		if (subxacts[i - 1].xid == xid)
-			return;
-	}
-
-	/* This is a new subxact, so we need to add it to the array. */
-
-	if (nsubxacts == 0)
-	{
-		MemoryContext oldctx;
-
-		nsubxacts_max = 128;
-		oldctx = MemoryContextSwitchTo(TopMemoryContext);
-		subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo));
-		MemoryContextSwitchTo(oldctx);
-	}
-	else if (nsubxacts == nsubxacts_max)
-	{
-		nsubxacts_max *= 2;
-		subxacts = repalloc(subxacts, nsubxacts_max * sizeof(SubXactInfo));
-	}
-
-	subxacts[nsubxacts].xid = xid;
-	subxacts[nsubxacts].offset = lseek(stream_fd, 0, SEEK_END);
-
-	nsubxacts++;
-}
-
-/* format filename for file containing the info about subxacts */
-static void
-subxact_filename(char *path, Oid subid, TransactionId xid)
-{
-	char		tempdirpath[MAXPGPATH];
-
-	TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID);
-
-	/*
-	 * We might need to create the tablespace's tempfile directory, if no
-	 * one has yet done so.
-	 *
-	 * Don't check for error from mkdir; it could fail if the directory
-	 * already exists (maybe someone else just did the same thing).  If
-	 * it doesn't work then we'll bomb out when opening the file
-	 */
-	mkdir(tempdirpath, S_IRWXU);
-
-	snprintf(path, MAXPGPATH, "%s/logical-%u-%u.subxacts",
-			 tempdirpath, subid, xid);
-}
-
-/* format filename for file containing serialized changes */
-static void
-changes_filename(char *path, Oid subid, TransactionId xid)
-{
-	char		tempdirpath[MAXPGPATH];
-
-	TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID);
-
-	/*
-	 * We might need to create the tablespace's tempfile directory, if no
-	 * one has yet done so.
-	 *
-	 * Don't check for error from mkdir; it could fail if the directory
-	 * already exists (maybe someone else just did the same thing).  If
-	 * it doesn't work then we'll bomb out when opening the file
-	 */
-	mkdir(tempdirpath, S_IRWXU);
-
-	snprintf(path, MAXPGPATH, "%s/logical-%u-%u.changes",
-			 tempdirpath, subid, xid);
-}
-
-/*
- * stream_cleanup_files
- *	  Cleanup files for a subscription / toplevel transaction.
- *
- * Remove files with serialized changes and subxact info for a particular
- * toplevel transaction. Each subscription has a separate set of files.
- *
- * Note: The files may not exists, so handle ENOENT as non-error.
- *
- * TODO: Add missing_ok flag to specify in which cases it's OK not to
- * find the files, and when it's an error.
- */
-static void
-stream_cleanup_files(Oid subid, TransactionId xid)
-{
-	int			i;
-	char		path[MAXPGPATH];
-	bool		found = false;
-
-	subxact_filename(path, subid, xid);
-
-	if ((unlink(path) < 0) && (errno != ENOENT))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not remove file \"%s\": %m", path)));
-
-	changes_filename(path, subid, xid);
-
-	if ((unlink(path) < 0) && (errno != ENOENT))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not remove file \"%s\": %m", path)));
-
-	/*
-	 * Cleanup the XID from the array - find the XID in the array and
-	 * remove it by shifting all the remaining elements. The array is
-	 * bound to be fairly small (maximum number of in-progress xacts,
-	 * so max_connections + max_prepared_transactions) so simply loop
-	 * through the array and find index of the XID. Then move the rest
-	 * of the array by one element to the left.
-	 *
-	 * Notice we also call this from stream_open_file for first segment
-	 * of each transaction, to deal with possible left-overs after a
-	 * crash, so it's entirely possible not to find the XID in the
-	 * array here. In that case we don't remove anything.
-	 *
-	 * XXX Perhaps it'd be better to handle this automatically after a
-	 * restart, instead of doing it over and over for each transaction.
-	 */
-	for (i = 0; i < nxids; i++)
-	{
-		if (xids[i] == xid)
-		{
-			found = true;
-			break;
-		}
-	}
-
-	if (!found)
-		return;
-
-	/*
-	 * Move the last entry from the array to the place. We don't keep
-	 * the streamed transactions sorted or anything - we only expect 
-	 * a few of them in progress (max_connections + max_prepared_xacts)
-	 * so linear search is just fine.
-	 */
-	xids[i] = xids[nxids-1];
-	nxids--;
-}
-
-/*
- * stream_open_file
- *	  Open file we'll use to serialize changes for a toplevel transaction.
- *
- * Open a file for streamed changes from a toplevel transaction identified
- * by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, perform cleanup by removing existing
- * files after a possible previous crash.
- *
- * This can only be called at the beginning of a "streaming" block, i.e.
- * between stream_start/stream_stop messages from the upstream.
- */
-static void
-stream_open_file(Oid subid, TransactionId xid, bool first_segment)
-{
-	char		path[MAXPGPATH];
-	int			flags;
-
-	Assert(in_streamed_transaction);
-	Assert(OidIsValid(subid));
-	Assert(TransactionIdIsValid(xid));
-	Assert(stream_fd == -1);
-
-	/*
-	 * If this is the first segment for this transaction, try removing
-	 * existing files (if there are any, possibly after a crash).
-	 */
-	if (first_segment)
-	{
-		MemoryContext	oldcxt;
-
-		/* XXX make sure there are no previous files for this transaction */
-		stream_cleanup_files(subid, xid);
-
-		oldcxt = MemoryContextSwitchTo(TopMemoryContext);
-
-		/*
-		 * We need to remember the XIDs we spilled to files, so that we can
-		 * remove them at worker exit (e.g. after DROP SUBSCRIPTION).
-		 *
-		 * The number of XIDs we may need to track is fairly small, because
-		 * we can only stream toplevel xacts (so limited by max_connections
-		 * and max_prepared_transactions), and we only stream the large ones.
-		 * So we simply keep the XIDs in an unsorted array. If the number of
-		 * xacts gets large for some reason (e.g. very high max_connections),
-		 * a more elaborate approach might be better - e.g. sorted array, to
-		 * speed-up the lookups.
-		 */
-		if (nxids == maxnxids)	/* array of XIDs is full */
-		{
-			if (!xids)
-			{
-				maxnxids = 64;
-				xids = palloc(maxnxids * sizeof(TransactionId));
-			}
-			else
-			{
-				maxnxids = 2 * maxnxids;
-				xids = repalloc(xids, maxnxids * sizeof(TransactionId));
-			}
-		}
-
-		xids[nxids++] = xid;
-
-		MemoryContextSwitchTo(oldcxt);
-	}
-
-	changes_filename(path, subid, xid);
-
-	elog(DEBUG1, "opening file '%s' for streamed changes", path);
-
-	/*
-	 * If this is the first streamed segment, the file must not exist, so
-	 * make sure we're the ones creating it. Otherwise just open the file
-	 * for writing, in append mode.
-	 */
-	if (first_segment)
-		flags = (O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
-	else
-		flags = (O_WRONLY | O_APPEND | PG_BINARY);
-
-	stream_fd = OpenTransientFile(path, flags);
-
-	if (stream_fd < 0)
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not open file \"%s\": %m",
-						path)));
-}
-
-/*
- * stream_close_file
- *	  Close the currently open file with streamed changes.
- *
- * This can only be called at the beginning of a "streaming" block, i.e.
- * between stream_start/stream_stop messages from the upstream.
- */
-static void
-stream_close_file(void)
-{
-	Assert(in_streamed_transaction);
-	Assert(TransactionIdIsValid(stream_xid));
-	Assert(stream_fd != -1);
-
-	CloseTransientFile(stream_fd);
-
-	stream_xid = InvalidTransactionId;
-	stream_fd = -1;
-}
-
-/*
- * stream_write_change
- *	  Serialize a change to a file for the current toplevel transaction.
- *
- * The change is serialied in a simple format, with length (not including
- * the length), action code (identifying the message type) and message
- * contents (without the subxact TransactionId value).
- *
- * XXX The subxact file includes CRC32C of the contents. Maybe we should
- * include something like that here too, but doing so will not be as
- * straighforward, because we write the file in chunks.
- */
-static void
-stream_write_change(char action, StringInfo s)
-{
-	int			len;
-
-	Assert(in_streamed_transaction);
-	Assert(TransactionIdIsValid(stream_xid));
-	Assert(stream_fd != -1);
-
-	/* total on-disk size, including the action type character */
-	len = (s->len - s->cursor) + sizeof(char);
-
-	pgstat_report_wait_start(WAIT_EVENT_LOGICAL_CHANGES_WRITE);
-
-	/* first write the size */
-	if (write(stream_fd, &len, sizeof(len)) != sizeof(len))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not serialize streamed change to file: %m")));
-
-	/* then the action */
-	if (write(stream_fd, &action, sizeof(action)) != sizeof(action))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not serialize streamed change to file: %m")));
-
-	/* and finally the remaining part of the buffer (after the XID) */
-	len = (s->len - s->cursor);
-
-	if (write(stream_fd, &s->data[s->cursor], len) != len)
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not serialize streamed change to file: %m")));
-
-	pgstat_report_wait_end();
-}
-
-/* SIGHUP: set flag to reload configuration at next convenient time */
-static void
-logicalrep_worker_sighup(SIGNAL_ARGS)
-{
-	int			save_errno = errno;
-
-	got_SIGHUP = true;
-
-	/* Waken anything waiting on the process latch */
-	SetLatch(MyLatch);
-
-	errno = save_errno;
-}
-
-/* Logical Replication Apply worker entry point */
-void
-ApplyWorkerMain(Datum main_arg)
-{
-	int			worker_slot = DatumGetInt32(main_arg);
-	MemoryContext oldctx;
-	char		originname[NAMEDATALEN];
-	XLogRecPtr	origin_startpos;
-	char	   *myslotname;
-	WalRcvStreamOptions options;
-
-	/* Attach to slot */
-	logicalrep_worker_attach(worker_slot);
-
-	/* Setup signal handling */
-	pqsignal(SIGHUP, logicalrep_worker_sighup);
-	pqsignal(SIGTERM, die);
-	BackgroundWorkerUnblockSignals();
-
-	/*
-	 * We don't currently need any ResourceOwner in a walreceiver process, but
-	 * if we did, we could call CreateAuxProcessResourceOwner here.
-	 */
-
-	/* Initialise stats to a sanish value */
-	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
-		MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
-
-	/* Load the libpq-specific functions */
-	load_file("libpqwalreceiver", false);
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
 
 	/* Run as replica session replication role. */
 	SetConfigOption("session_replication_role", "replica",
@@ -2798,3 +2287,580 @@ IsLogicalWorker(void)
 {
 	return MyLogicalRepWorker != NULL;
 }
+
+/*
+ * Apply Background Worker main loop.
+ */
+void
+LogicalApplyBgwMain(Datum main_arg)
+{
+	volatile ParallelState *pst;
+
+	dsm_segment			*seg;
+	shm_toc				*toc;
+	PGPROC				*registrant;
+	shm_mq				*mq;
+	shm_mq_handle		*mqh;
+	shm_mq_result		 shmq_res;
+	// ConditionVariable	 cv;
+	LogicalRepWorker	 lrw;
+	MemoryContext		 oldcontext;
+
+	MemoryContextSwitchTo(TopMemoryContext);
+
+	/* Load the subscription into persistent memory context. */
+	ApplyContext = AllocSetContextCreate(TopMemoryContext,
+										 "ApplyContext",
+										 ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+	/*
+	 * Init the ApplyMessageContext which we clean up after each replication
+	 * protocol message.
+	 */
+	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+												"ApplyMessageContext",
+												ALLOCSET_DEFAULT_SIZES);
+
+	isLogicalApplyWorker = true;
+
+	/*
+	 * Establish signal handlers.
+	 *
+	 * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
+	 * it would a normal user backend.  To make that happen, we establish a
+	 * signal handler that is a stripped-down version of die().
+	 */
+	pqsignal(SIGTERM, handle_sigterm);
+	BackgroundWorkerUnblockSignals();
+
+	/*
+	 * Connect to the dynamic shared memory segment.
+	 *
+	 * The backend that registered this worker passed us the ID of a shared
+	 * memory segment to which we must attach for further instructions.  In
+	 * order to attach to dynamic shared memory, we need a resource owner.
+	 * Once we've mapped the segment in our address space, attach to the table
+	 * of contents so we can locate the various data structures we'll need to
+	 * find within the segment.
+	 */
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "Logical apply worker");
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to map dynamic shared memory segment")));
+	toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/*
+	 * Acquire a worker number.
+	 *
+	 * By convention, the process registering this background worker should
+	 * have stored the control structure at key 0.  We look up that key to
+	 * find it.  Our worker number gives our identity: there may be just one
+	 * worker involved in this parallel operation, or there may be many.
+	 */
+	pst = shm_toc_lookup(toc, 0, false);
+	MyParallelState = pst;
+
+	SpinLockAcquire(&pst->mutex);
+	pst->attached = true;
+	SpinLockRelease(&pst->mutex);
+
+	/*
+	 * Attach to the message queue.
+	 */
+	mq = shm_toc_lookup(toc, 1, false);
+	shm_mq_set_receiver(mq, MyProc);
+	mqh = shm_mq_attach(mq, seg, NULL);
+
+	/* Restore database connection. */
+	BackgroundWorkerInitializeConnectionByOid(pst->database_id,
+											  pst->authenticated_user_id, 0);
+
+	/*
+	 * Set the client encoding to the database encoding, since that is what
+	 * the leader will expect.
+	 */
+	SetClientEncoding(GetDatabaseEncoding());
+
+	lrw.subid = pst->subid;
+	MyLogicalRepWorker = &lrw;
+
+	stream_xid = pst->stream_xid;
+
+	StartTransactionCommand();
+	BeginTransactionBlock();
+	CommitTransactionCommand();
+	StartTransactionCommand();
+	// PushActiveSnapshot(GetTransactionSnapshot());
+
+	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+
+	/*
+	 * Indicate that we're fully initialized and ready to begin the main part
+	 * of the parallel operation.
+	 *
+	 * Once we signal that we're ready, the user backend is entitled to assume
+	 * that our on_dsm_detach callbacks will fire before we disconnect from
+	 * the shared memory segment and exit.  Generally, that means we must have
+	 * attached to all relevant dynamic shared memory data structures by now.
+	 */
+	SpinLockAcquire(&pst->mutex);
+	pst->ready = true;
+	// cv = pst->cv;
+	// if (pst->workers_ready == pst->workers_total)
+	// {
+	//	 registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+	//	 if (registrant == NULL)
+	//	 {
+	//		 elog(DEBUG1, "registrant backend has exited prematurely");
+	//		 proc_exit(1);
+	//	 }
+	//	 SetLatch(&registrant->procLatch);
+	// }
+	SpinLockRelease(&pst->mutex);
+	elog(LOG, "[Apply BGW #%u] started", pst->n);
+
+	registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+	SetLatch(&registrant->procLatch);
+
+	for (;;)
+	{
+		void *data;
+		Size  len;
+		StringInfoData s;
+		MemoryContext	oldctx;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Ensure we are reading the data into our memory context. */
+		oldctx = MemoryContextSwitchTo(ApplyMessageContext);
+
+		shmq_res = shm_mq_receive(mqh, &len, &data, false);
+
+		if (shmq_res != SHM_MQ_SUCCESS)
+			break;
+
+		if (len == 0)
+		{
+			elog(LOG, "[Apply BGW #%u] got zero-length message, stopping", pst->n);
+			break;
+		}
+		else
+		{
+			s.cursor = 0;
+			s.maxlen = -1;
+			s.data = (char *) data;
+			s.len = len;
+
+			/*
+			 * We use first byte of message for additional communication between
+			 * main Logical replication worker and Apply BGWorkers, so if it
+			 * differs from 'w', then process it first.
+			 */
+			switch (pq_getmsgbyte(&s))
+			{
+				/* Stream stop */
+				case 'E':
+				{
+					in_remote_transaction = false;
+
+					SpinLockAcquire(&pst->mutex);
+					pst->ready = true;
+					SpinLockRelease(&pst->mutex);
+					SetLatch(&registrant->procLatch);
+
+					elog(LOG, "[Apply BGW #%u] ended processing streaming chunk, waiting on shm_mq_receive", pst->n);
+
+					continue;
+				}
+				/* Reassign to the new transaction */
+				case 'R':
+				{
+					elog(LOG, "[Apply BGW #%u] switching from processing xid %u to xid %u",
+											pst->n, stream_xid, pst->stream_xid);
+					stream_xid = pst->stream_xid;
+
+					StartTransactionCommand();
+					BeginTransactionBlock();
+					CommitTransactionCommand();
+					StartTransactionCommand();
+
+					MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+
+					continue;
+				}
+				/* Finished processing xact */
+				case 'F':
+				{
+					elog(LOG, "[Apply BGW #%u] finished processing xact %u", pst->n, stream_xid);
+
+					MemoryContextSwitchTo(ApplyContext);
+
+					CommitTransactionCommand();
+					EndTransactionBlock();
+					CommitTransactionCommand();
+
+					SpinLockAcquire(&pst->mutex);
+					pst->finished = true;
+					SpinLockRelease(&pst->mutex);
+
+					continue;
+				}
+				default:
+					break;
+			}
+
+			pq_getmsgint64(&s); // Read LSN info
+			pq_getmsgint64(&s); // TODO Do we need to process it here again somehow?
+			pq_getmsgint64(&s);
+
+			/*
+			 * Make sure the handle apply_dispatch methods are aware we're in a remote
+			 * transaction.
+			 */
+			in_remote_transaction = true;
+			pgstat_report_activity(STATE_RUNNING, NULL);
+
+			elog(DEBUG5, "[Apply BGW #%u] applying dispatch for action=%s",
+									pst->n, (char *) &s.data[s.cursor]);
+			apply_dispatch(&s);
+		}
+
+		MemoryContextSwitchTo(oldctx);
+		MemoryContextReset(ApplyMessageContext);
+	}
+
+	CommitTransactionCommand();
+	EndTransactionBlock();
+	CommitTransactionCommand();
+
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(ApplyContext);
+
+	SpinLockAcquire(&pst->mutex);
+	pst->finished = true;
+	// if (pst->workers_finished == pst->workers_total)
+	// {
+	//	 registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+	//	 if (registrant == NULL)
+	//	 {
+	//		 elog(DEBUG1, "registrant backend has exited prematurely");
+	//		 proc_exit(1);
+	//	 }
+	//	 SetLatch(&registrant->procLatch);
+	// }
+	SpinLockRelease(&pst->mutex);
+
+	elog(LOG, "[Apply BGW #%u] exiting", pst->n);
+
+	/* Signal main process that we are done. */
+	// ConditionVariableBroadcast(&cv);
+	SetLatch(&registrant->procLatch);
+
+	/*
+	 * We're done.  Explicitly detach the shared memory segment so that we
+	 * don't get a resource leak warning at commit time.  This will fire any
+	 * on_dsm_detach callbacks we've registered, as well.  Once that's done,
+	 * we can go ahead and exit.
+	 */
+	dsm_detach(seg);
+	proc_exit(0);
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend.  The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+	int save_errno = errno;
+
+	SetLatch(MyLatch);
+
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ProcDiePending = true;
+	}
+
+	errno = save_errno;
+}
+
+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a control region that contains a ParallelState,
+ * plus one region per message queue. There are as many message queues as
+ * the number of workers.
+ */
+static void
+setup_dsm(WorkerState *wstate)
+{
+	shm_toc_estimator	 e;
+	int					 toc_key = 0;
+	Size				 segsize;
+	dsm_segment			*seg;
+	shm_toc				*toc;
+	ParallelState		*pst;
+	shm_mq				*mq;
+	int64				 queue_size = 160000000; /* 16 MB for now */
+
+	/* Ensure a valid queue size. */
+	if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("queue size must be at least %zu bytes",
+						shm_mq_minimum_size)));
+	if (queue_size != ((Size) queue_size))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("queue size overflows size_t")));
+
+	/*
+	 * Estimate how much shared memory we need.
+	 *
+	 * Because the TOC machinery may choose to insert padding of oddly-sized
+	 * requests, we must estimate each chunk separately.
+	 *
+	 * We need one key to register the location of the header, and we need
+	 * nworkers keys to track the locations of the message queues.
+	 */
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(ParallelState));
+	shm_toc_estimate_chunk(&e, (Size) queue_size);
+
+	shm_toc_estimate_keys(&e, 1 + 1);
+	segsize = shm_toc_estimate(&e);
+
+	/* Create the shared memory segment and establish a table of contents. */
+	seg = dsm_create(shm_toc_estimate(&e), 0);
+	toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
+						 segsize);
+
+	/* Set up the header region. */
+	pst = shm_toc_allocate(toc, sizeof(ParallelState));
+	SpinLockInit(&pst->mutex);
+	pst->attached = false;
+	pst->ready = false;
+	pst->finished = false;
+	pst->database_id = MyDatabaseId;
+	pst->subid = MyLogicalRepWorker->subid;
+	pst->stream_xid = stream_xid;
+	pst->authenticated_user_id = GetAuthenticatedUserId();
+	pst->n = nworkers + 1;
+	// ConditionVariableInit(&pst->cv);
+
+	shm_toc_insert(toc, toc_key++, pst);
+
+	/* Set up one message queue per worker, plus one. */
+	mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
+						(Size) queue_size);
+	shm_toc_insert(toc, toc_key++, mq);
+	shm_mq_set_sender(mq, MyProc);
+
+	/* Attach the queues. */
+	wstate->mq_handle = shm_mq_attach(mq, seg, wstate->handle);
+
+	/* Return results to caller. */
+	wstate->dsm_seg = seg;
+	wstate->pstate = pst;
+}
+
+/*
+ * Register background workers.
+ */
+static void
+setup_background_worker(WorkerState *wstate)
+{
+	MemoryContext		oldcontext;
+	BackgroundWorker	worker;
+
+	elog(LOG, "setting up apply worker #%u", nworkers + 1);
+
+	/*
+	 * TOCHECK: We need the worker_state object and the background worker handles to
+	 * which it points to be allocated in TopMemoryContext rather than
+	 * ApplyMessageContext; otherwise, they'll be destroyed before the on_dsm_detach
+	 * hooks run.
+	 */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+	setup_dsm(wstate);
+
+	/*
+	 * Arrange to kill all the workers if we abort before all workers are
+	 * finished hooking themselves up to the dynamic shared memory segment.
+	 *
+	 * If we die after all the workers have finished hooking themselves up to
+	 * the dynamic shared memory segment, we'll mark the two queues to which
+	 * we're directly connected as detached, and the worker(s) connected to
+	 * those queues will exit, marking any other queues to which they are
+	 * connected as detached.  This will cause any as-yet-unaware workers
+	 * connected to those queues to exit in their turn, and so on, until
+	 * everybody exits.
+	 *
+	 * But suppose the workers which are supposed to connect to the queues to
+	 * which we're directly attached exit due to some error before they
+	 * actually attach the queues.  The remaining workers will have no way of
+	 * knowing this.  From their perspective, they're still waiting for those
+	 * workers to start, when in fact they've already died.
+	 */
+	on_dsm_detach(wstate->dsm_seg, cleanup_background_worker,
+				  PointerGetDatum(wstate));
+
+	/* Configure a worker. */
+	MemSet(&worker, 0, sizeof(BackgroundWorker));
+
+	worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+		BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_notify_pid = MyProcPid;
+	sprintf(worker.bgw_library_name, "postgres");
+	sprintf(worker.bgw_function_name, "LogicalApplyBgwMain");
+
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(wstate->dsm_seg));
+
+	/* Register the workers. */
+	snprintf(worker.bgw_name, BGW_MAXLEN,
+			"logical replication apply worker #%u for subscription %u",
+										nworkers + 1, MySubscription->oid);
+	if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					errmsg("could not register background process"),
+					errhint("You may need to increase max_worker_processes.")));
+
+	/* All done. */
+	MemoryContextSwitchTo(oldcontext);
+
+	/* Wait for worker to become ready. */
+	wait_for_worker(wstate);
+
+	/*
+	 * Once we reach this point, all workers are ready.  We no longer need to
+	 * kill them if we die; they'll die on their own as the message queues
+	 * shut down.
+	 */
+	cancel_on_dsm_detach(wstate->dsm_seg, cleanup_background_worker,
+						 PointerGetDatum(wstate));
+
+	nworkers += 1;
+}
+
+static void
+cleanup_background_worker(dsm_segment *seg, Datum arg)
+{
+	WorkerState *wstate = (WorkerState *) DatumGetPointer(arg);
+
+	TerminateBackgroundWorker(wstate->handle);
+}
+
+static void
+wait_for_worker(WorkerState *wstate)
+{
+	bool result = false;
+
+	for (;;)
+	{
+		// ConditionVariable cv;
+		bool ready;
+
+		/* If the worker is ready, we have succeeded. */
+		SpinLockAcquire(&wstate->pstate->mutex);
+		ready = wstate->pstate->ready;
+		// cv = wstate->pstate->cv;
+		SpinLockRelease(&wstate->pstate->mutex);
+		if (ready)
+		{
+			result = true;
+			break;
+		}
+
+		/* If any workers (or the postmaster) have died, we have failed. */
+		if (!check_worker_status(wstate))
+		{
+			result = false;
+			break;
+		}
+
+		/* Wait for the workers to wake us up. */
+		// ConditionVariableSleep(&cv, WAIT_EVENT_LOGICAL_APPLY_WORKER_READY);
+
+		/* Wait to be signalled. */
+		WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+							WAIT_EVENT_LOGICAL_APPLY_WORKER_READY);
+
+		/* Reset the latch so we don't spin. */
+		ResetLatch(MyLatch);
+
+		/* An interrupt may have occurred while we were waiting. */
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	// ConditionVariableCancelSleep();
+
+	if (!result)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("one or more background workers failed to start")));
+}
+
+static bool
+check_worker_status(WorkerState *wstate)
+{
+	BgwHandleStatus status;
+	pid_t			pid;
+
+	status = GetBackgroundWorkerPid(wstate->handle, &pid);
+	if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
+		return false;
+
+	/* Otherwise, things still look OK. */
+	return true;
+}
+
+static void
+wait_for_worker_to_finish(WorkerState *wstate)
+{
+	elog(LOG, "waiting for apply worker #%u to finish processing xid %u",
+										wstate->pstate->n, wstate->xid);
+
+	for (;;)
+	{
+		// ConditionVariable cv;
+		bool finished;
+
+		/* If the worker is finished, we have succeeded. */
+		SpinLockAcquire(&wstate->pstate->mutex);
+		finished = wstate->pstate->finished;
+		// cv = wstate->pstate->cv;
+		SpinLockRelease(&wstate->pstate->mutex);
+		if (finished)
+		{
+			break;
+		}
+
+		/* Wait for the workers to wake us up. */
+		// ConditionVariableSleep(&cv, WAIT_EVENT_LOGICAL_APPLY_WORKER_READY);
+
+		/* Wait to be signalled. */
+		WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+							WAIT_EVENT_LOGICAL_APPLY_WORKER_READY);
+
+		/* Reset the latch so we don't spin. */
+		ResetLatch(MyLatch);
+
+		/* An interrupt may have occurred while we were waiting. */
+		CHECK_FOR_INTERRUPTS();
+	}
+}
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 3a89e23488..7c72db9e83 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -819,6 +819,7 @@ typedef enum
 	WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING,
 	WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING,
 	WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING,
+	WAIT_EVENT_LOGICAL_APPLY_WORKER_READY,
 	WAIT_EVENT_LOGICAL_SYNC_DATA,
 	WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
 	WAIT_EVENT_MQ_INTERNAL,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 802275311d..afb15c2736 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -122,12 +122,10 @@ extern TransactionId logicalrep_read_stream_stop(StringInfo in);
 
 extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
 						XLogRecPtr commit_lsn);
-extern TransactionId logicalrep_read_stream_commit(StringInfo out,
+extern void logicalrep_read_stream_commit(StringInfo out,
 					   LogicalRepCommitData *commit_data);
 
 extern void logicalrep_write_stream_abort(StringInfo out,
 							  TransactionId xid, TransactionId subxid);
-extern void logicalrep_read_stream_abort(StringInfo in,
-							 TransactionId *xid, TransactionId *subxid);
 
 #endif							/* LOGICALREP_PROTO_H */
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index e9524aefd9..30ad40247d 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -13,6 +13,7 @@
 #define LOGICALWORKER_H
 
 extern void ApplyWorkerMain(Datum main_arg);
+extern void LogicalApplyBgwMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
-- 
2.17.1

Reply via email to