Hi,

I would like to propose using multi-inserts for logical replication apply
for PG20. The attached patch is a WIP and I'm looking for feedback on the
overall approach.

The logical replication apply worker currently inserts tuples one at a
time. Each row writes its own WAL record and goes through a page
lock/unlock cycle, so a query that inserts 100M rows on the publisher turns
into 100M single-row inserts on the subscriber. The patch buffers
consecutive inserts within a transaction on the subscriber and applies them
with the existing multi-insert table AM, the same way COPY does. When
several tuples fit on one page, that writes a single WAL record for all of
them and locks the page once instead of once per row.

I ran a few insert-only experiments to see how much this helps
(r7i.8xlarge, shared_buffers=16GB, streaming=parallel, checkpoints
disabled). I also measured the time spent in the insert call itself, apart
from total apply time:

row size    rows   WAL H      WAL P      WAL gain   WAL records H   WAL
records P   records gain   insert time H   insert time P   insert gain
apply time H   apply time P   apply gain
----------  -----  --------   --------   --------   -------------
-------------   ------------   -------------   -------------   -----------
  ------------   ------------   ----------
50 B/row    100M   7.5GB      2.9GB      2.6x       101060526       1798770
        56x            61s             15s             4.0x          487s
        469s           1.04x
128 B/row   100M   14.6GB     10.1GB     1.4x       102643991       4512768
        23x            138s            92s             1.5x          508s
        479s           1.06x
1.4 KB/row  10M    13.2GB     12.8GB     1.03x      10001543        2164961
        4.6x           317s            309s            1.03x         334s
        319s           1.05x

H = HEAD (single-insert), P = PATCHED (multi-insert). Gain = H / P.

The narrower the row, the more rows fit on a page and the bigger the
saving. The WAL reduction is the main benefit - less I/O, fewer WAL files,
faster crash recovery, and less WAL to ship for cascading replication. The
apply-time change is small (under 5%). My hypothesis is that the other
per-row work in the apply path outweighs the WAL write, so fewer WAL
records alone doesn't move it much. I'll also test with streaming=off to
see how that compares. (I also have small-scale results from 1 to 100K rows
showing the gain grows with row count; happy to share if useful.)

For some background, I proposed multi-insert for CREATE TABLE AS and
materialized views earlier [1]. That ran into two problems. The slot type
there isn't known ahead of time because the top plan node decides it, and I
had also bundled in table AM interface changes, which widened the scope.
Logical replication apply avoids both - the slot type is always
TTSOpsVirtual, and the existing multi-insert table AM API can be used as-is.

Here are the main design decisions I'd like feedback on.

It reuses COPY's existing checks for when multi-insert isn't safe, falling
back to the single-row path in the same cases: before/instead-of row
triggers, volatile defaults, foreign tables/partitions, and so on.

The patch buffers consecutive inserts to the same relation, in both leader
and parallel apply workers. It flushes at 1000 tuples or 64KB (adjustable
or configurable later), on a switch to a different relation, or when any
non-insert message arrives, which keeps ordering correct in mixed
transactions. The first insert always takes the existing single-row path;
it switches to multi-insert only after consecutive inserts to the same
relation. Inserts into several relations in turn flush on every switch;
per-relation buffers could avoid that, but I've left that out for now.

Only the heap write goes through multi-insert. Constraint checks, index
maintenance, conflict detection, and after-row triggers still run per row,
exactly as the single-row path does, so visible behavior is unchanged.
Constraints are evaluated per row before the multi-insert. Pushing these
passes into the heap AM layer would avoid the extra loops but needs table
AM changes I'd rather not get into here.

On conflict detection: today an insert conflict raises an ERROR, so a
conflict on a buffered row rolls back the whole batch. Per-row resolutions
(ON CONFLICT DO NOTHING/DO UPDATE style) would need more thought, since
rows land in the heap before the per-row check runs.

Currently the subscriber batches single inserts (this patch) and does a
multi-insert. Another idea is to add a new message type for multi-insert on
the publisher when decoding XLOG_HEAP2_MULTI_INSERT, with the subscriber
using multi-insert only on that path. That keeps things simpler, but it
only kicks in when the publisher used multi-insert itself (e.g. COPY FROM),
and it requires protocol changes.

This is still a WIP and I've likely missed some edge cases, so I appreciate
feedback on the overall approach. Thank you!

[1]
https://www.postgresql.org/message-id/flat/CALj2ACX9vVYHYkX8e6w058EuAs8JL5EsnzadTxGhpiE_Ep_ByA%40mail.gmail.com

-- 
Bharath Rupireddy
Amazon Web Services: https://aws.amazon.com
From d1445b532c5efd07de05c4529346df58e7c08741 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Mon, 22 Jun 2026 19:35:47 +0000
Subject: [PATCH v1] WIP Multi insert for logical replication apply

---
 src/backend/executor/execReplication.c   |   2 +-
 src/backend/replication/logical/proto.c  |   4 +
 src/backend/replication/logical/worker.c | 367 ++++++++++++++++++++++-
 src/include/replication/conflict.h       |   5 +
 src/include/replication/logicalproto.h   |   2 +
 5 files changed, 378 insertions(+), 2 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index b2ca5cbf117..bc0a6e0d578 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -766,7 +766,7 @@ retry:
  * Check all the unique indexes in 'recheckIndexes' for conflict with the
  * tuple in 'remoteslot' and report if found.
  */
-static void
+void
 CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
 					   ConflictType type, List *recheckIndexes,
 					   TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 86ad97cd937..61de078960a 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -873,6 +873,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 	tuple->colvalues = palloc0_array(StringInfoData, natts);
 	tuple->colstatus = palloc_array(char, natts);
 	tuple->ncols = natts;
+	tuple->datasize = 0;
 
 	/* Read the data */
 	for (i = 0; i < natts; i++)
@@ -897,6 +898,9 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 			case LOGICALREP_COLUMN_BINARY:
 				len = pq_getmsgint(in, 4);	/* read length */
 
+				/* accumulate received value length for buffering estimates */
+				tuple->datasize += len;
+
 				/* and data */
 				buff = palloc(len + 1);
 				pq_copymsgbytes(in, buff, len);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7799266c614..f78d472fd10 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -248,6 +248,7 @@
 #include <unistd.h>
 
 #include "access/genam.h"
+#include "access/heapam.h"
 #include "access/commit_ts.h"
 #include "access/table.h"
 #include "access/tableam.h"
@@ -303,6 +304,13 @@
 
 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
 
+/*
+ * Multi-insert buffer limits. Same as COPY FROM. MAX_BUFFERED_BYTES is compared
+ * against LogicalRepTupleData.datasize (real received value lengths).
+ */
+#define MAX_BUFFERED_TUPLES		1000
+#define MAX_BUFFERED_BYTES		(65535)
+
 typedef struct FlushPosition
 {
 	dlist_node	node;
@@ -489,6 +497,29 @@ static List *on_commit_wakeup_workers_subids = NIL;
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
+/*
+ * Multi-insert buffer state. Consecutive INSERTs to the same eligible relation
+ * are accumulated here and flushed via table_multi_insert(). Kept alive across
+ * flushes; torn down on a non-INSERT message or relation change.
+ */
+typedef struct ApplyMultiInsertBuffer
+{
+	LogicalRepRelId relid;
+	LogicalRepRelMapEntry *rel;
+	ApplyExecutionData *edata;
+	TupleTableSlot **slots;
+	int			nslots_allocated;
+	int			nused;
+	Size		bytes;
+	bool		run_as_owner;
+	UserContext ucxt;
+	BulkInsertState bistate;
+	bool		indices_open;
+	MemoryContext batch_context;	/* reset on each flush */
+} ApplyMultiInsertBuffer;
+
+static ApplyMultiInsertBuffer *pending_multi_insert = NULL;
+
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
 
@@ -636,6 +667,12 @@ static void set_wal_receiver_timeout(void);
 
 static void on_exit_clear_xact_state(int code, Datum arg);
 
+/* Multi-insert functions */
+static void apply_handle_insert(StringInfo s);
+static void apply_multi_insert_flush(void);
+static void apply_multi_insert_teardown(void);
+static bool rel_can_multi_insert(LogicalRepRelMapEntry *rel);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -2642,6 +2679,327 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
 						RelationGetRelationName(rel))));
 }
 
+/*
+ * Check whether a relation is eligible for multi-insert.
+ *
+ * Fall back to the single-row path for cases the batch path can't reproduce:
+ * partitioned tables, BEFORE/INSTEAD ROW triggers, and stored generated columns.
+ */
+static bool
+rel_can_multi_insert(LogicalRepRelMapEntry *rel)
+{
+	Relation	localrel = rel->localrel;
+
+	if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return false;
+
+	if (localrel->trigdesc &&
+		(localrel->trigdesc->trig_insert_before_row ||
+		 localrel->trigdesc->trig_insert_instead_row))
+		return false;
+
+	if (localrel->rd_att->constr &&
+		localrel->rd_att->constr->has_generated_stored)
+		return false;
+
+	return true;
+}
+
+/*
+ * Flush the pending multi-insert buffer via table_multi_insert(). The buffer
+ * stays alive for reuse - only the row count and batch_context are reset.
+ */
+static void
+apply_multi_insert_flush(void)
+{
+	ApplyMultiInsertBuffer *buffer = pending_multi_insert;
+	ResultRelInfo *relinfo;
+	EState	   *estate;
+	int			nused;
+	MemoryContext oldctx_mi;
+
+	if (buffer == NULL || buffer->nused == 0)
+		return;
+
+	nused = buffer->nused;
+	relinfo = buffer->edata->targetRelInfo;
+	estate = buffer->edata->estate;
+
+	PushActiveSnapshot(GetTransactionSnapshot());
+
+	/* Parity with ExecSimpleRelationInsert() (no-op for CMD_INSERT today) */
+	CheckCmdReplicaIdentity(relinfo->ri_RelationDesc, CMD_INSERT);
+
+	TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+
+	/* Open indices once, keep open across flushes */
+	if (!buffer->indices_open)
+	{
+		ExecOpenIndices(relinfo, false);
+		InitConflictIndexes(relinfo);
+		buffer->indices_open = true;
+	}
+
+	/* Constraint checks before the bulk insert (gen cols/BR triggers excluded) */
+	if (relinfo->ri_RelationDesc->rd_att->constr)
+	{
+		for (int i = 0; i < nused; i++)
+			ExecConstraints(relinfo, buffer->slots[i], estate);
+	}
+	if (relinfo->ri_RelationDesc->rd_rel->relispartition)
+	{
+		for (int i = 0; i < nused; i++)
+			ExecPartitionCheck(relinfo, buffer->slots[i], estate, true);
+	}
+
+	/* table_multi_insert leaks HeapTuples; run in batch_context (reset below) */
+	oldctx_mi = MemoryContextSwitchTo(buffer->batch_context);
+	table_multi_insert(relinfo->ri_RelationDesc,
+						buffer->slots,
+						nused,
+						GetCurrentCommandId(true),
+						0,
+						buffer->bistate);
+	MemoryContextSwitchTo(oldctx_mi);
+
+	/* Per row: index entries, conflict report, AFTER triggers, clear slot */
+	for (int i = 0; i < nused; i++)
+	{
+		TupleTableSlot *slot = buffer->slots[i];
+		List	   *recheckIndexes = NIL;
+		List	   *conflictindexes;
+		bool		conflict = false;
+
+		conflictindexes = relinfo->ri_onConflictArbiterIndexes;
+
+		if (relinfo->ri_NumIndices > 0)
+		{
+			uint32		flags = (conflictindexes != NIL) ? EIIT_NO_DUPE_ERROR : 0;
+
+			recheckIndexes = ExecInsertIndexTuples(relinfo, estate, flags,
+												   slot, conflictindexes,
+												   &conflict);
+		}
+
+		/* Report conflict post-insert (see ExecSimpleRelationInsert()) */
+		if (conflict)
+			CheckAndReportConflict(relinfo, estate, CT_INSERT_EXISTS,
+								   recheckIndexes, NULL, slot);
+
+		/* AFTER ROW INSERT Triggers */
+		ExecARInsertTriggers(estate, relinfo, slot, recheckIndexes, NULL);
+
+		list_free(recheckIndexes);
+
+		slot->tts_flags &= ~TTS_FLAG_SHOULDFREE;
+		slot->tts_flags |= TTS_FLAG_EMPTY;
+		slot->tts_nvalid = 0;
+		ItemPointerSetInvalid(&slot->tts_tid);
+	}
+
+	/* Bulk-free datums, materialized slot data and HeapTuples in one reset */
+	MemoryContextReset(buffer->batch_context);
+
+	/* Reset for next batch - keep buffer alive */
+	buffer->nused = 0;
+	buffer->bytes = 0;
+
+	PopActiveSnapshot();
+	CommandCounterIncrement();
+}
+
+/*
+ * Flush any remaining rows and free the multi-insert buffer completely. Called
+ * on relation switch, non-INSERT message, or transaction end.
+ */
+static void
+apply_multi_insert_teardown(void)
+{
+	ApplyMultiInsertBuffer *buffer = pending_multi_insert;
+
+	if (buffer == NULL)
+		return;
+
+	/* Flush any remaining rows first */
+	if (buffer->nused > 0)
+		apply_multi_insert_flush();
+
+	if (buffer->indices_open)
+		ExecCloseIndices(buffer->edata->targetRelInfo);
+
+	if (buffer->bistate)
+		FreeBulkInsertState(buffer->bistate);
+
+	if (buffer->batch_context)
+		MemoryContextDelete(buffer->batch_context);
+
+	finish_edata(buffer->edata);
+
+	apply_error_callback_arg.rel = NULL;
+
+	if (!buffer->run_as_owner)
+		RestoreUserContext(&buffer->ucxt);
+
+	logicalrep_rel_close(buffer->rel, NoLock);
+
+	for (int i = 0; i < buffer->nslots_allocated && buffer->slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	pfree(buffer->slots);
+	pfree(buffer);
+	pending_multi_insert = NULL;
+}
+
+/*
+ * Handle INSERT via multi-insert path, accumulating consecutive INSERTs to the
+ * same relation into a buffer that is flushed via table_multi_insert().
+ */
+static void
+apply_handle_multi_insert(StringInfo s)
+{
+	LogicalRepTupleData newtup;
+	LogicalRepRelId relid;
+	LogicalRepRelMapEntry *rel;
+	ApplyMultiInsertBuffer *buffer;
+	ApplyExecutionData *edata;
+	EState	   *estate;
+	TupleTableSlot *remoteslot;
+	MemoryContext oldctx;
+	bool		run_as_owner;
+	UserContext ucxt;
+
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+		return;
+
+	relid = logicalrep_read_insert(s, &newtup);
+
+	/* Fast path: same relation as the pending buffer - accumulate */
+	if (pending_multi_insert && pending_multi_insert->relid == relid)
+	{
+		buffer = pending_multi_insert;
+		rel = buffer->rel;
+		edata = buffer->edata;
+		estate = edata->estate;
+
+		apply_error_callback_arg.rel = rel;
+
+		/* Lazily create slot on first use, reused across flushes */
+		if (buffer->slots[buffer->nused] == NULL)
+		{
+			oldctx = MemoryContextSwitchTo(ApplyContext);
+			buffer->slots[buffer->nused] = MakeSingleTupleTableSlot(
+				RelationGetDescr(rel->localrel), &TTSOpsVirtual);
+			buffer->slots[buffer->nused]->tts_mcxt = buffer->batch_context;
+			MemoryContextSwitchTo(oldctx);
+		}
+		remoteslot = buffer->slots[buffer->nused];
+
+		/* Allocate datum values in batch context - reset on flush */
+		oldctx = MemoryContextSwitchTo(buffer->batch_context);
+		slot_store_data(remoteslot, rel, &newtup);
+		slot_fill_defaults(rel, estate, remoteslot);
+		MemoryContextSwitchTo(oldctx);
+
+		buffer->nused++;
+		buffer->bytes += newtup.datasize;
+
+		if (buffer->nused >= MAX_BUFFERED_TUPLES ||
+			buffer->bytes >= MAX_BUFFERED_BYTES)
+			apply_multi_insert_flush();
+
+		return;
+	}
+
+	/* Slow path: first INSERT, or a different relation */
+	begin_replication_step();
+
+	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
+	if (!should_apply_changes_for_rel(rel))
+	{
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return;
+	}
+
+	/* Tear down existing buffer if switching relations */
+	if (pending_multi_insert)
+		apply_multi_insert_teardown();
+
+	if (!rel_can_multi_insert(rel))
+	{
+		/* Fall back to single-row path */
+		logicalrep_rel_close(rel, NoLock);
+		end_replication_step();
+		apply_handle_insert(s);
+		return;
+	}
+
+	/* Set up new multi-insert buffer */
+	run_as_owner = MySubscription->runasowner;
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+	apply_error_callback_arg.rel = rel;
+
+	oldctx = MemoryContextSwitchTo(ApplyContext);
+
+	edata = create_edata_for_relation(rel);
+
+	buffer = palloc0(sizeof(ApplyMultiInsertBuffer));
+	buffer->relid = relid;
+	buffer->rel = rel;
+	buffer->edata = edata;
+	buffer->run_as_owner = run_as_owner;
+	if (!run_as_owner)
+		buffer->ucxt = ucxt;
+
+	/* Slot array allocated as NULLs, slots created lazily */
+	buffer->nslots_allocated = MAX_BUFFERED_TUPLES;
+	buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+	buffer->bistate = GetBulkInsertState();
+	buffer->batch_context = AllocSetContextCreate(ApplyContext,
+													"multi-insert batch",
+													ALLOCSET_DEFAULT_SIZES);
+
+	MemoryContextSwitchTo(oldctx);
+
+	pending_multi_insert = buffer;
+
+	estate = edata->estate;
+
+	/* Lazily create first slot */
+	if (buffer->slots[0] == NULL)
+	{
+		oldctx = MemoryContextSwitchTo(ApplyContext);
+		buffer->slots[0] = MakeSingleTupleTableSlot(
+			RelationGetDescr(rel->localrel), &TTSOpsVirtual);
+		buffer->slots[0]->tts_mcxt = buffer->batch_context;
+		MemoryContextSwitchTo(oldctx);
+	}
+	remoteslot = buffer->slots[0];
+
+	/* Allocate datum values in batch context - reset on flush */
+	oldctx = MemoryContextSwitchTo(buffer->batch_context);
+	slot_store_data(remoteslot, rel, &newtup);
+	slot_fill_defaults(rel, estate, remoteslot);
+	MemoryContextSwitchTo(oldctx);
+
+	buffer->nused = 1;
+	buffer->bytes = newtup.datasize;
+
+	/*
+	 * Flush if this single tuple already fills the buffer. end_replication_step()
+	 * must run regardless to release our snapshot; the flush manages its own.
+	 */
+	if (buffer->nused >= MAX_BUFFERED_TUPLES ||
+		buffer->bytes >= MAX_BUFFERED_BYTES)
+		apply_multi_insert_flush();
+
+	end_replication_step();
+}
+
 /*
  * Handle INSERT message.
  */
@@ -3807,6 +4165,13 @@ apply_dispatch(StringInfo s)
 	saved_command = apply_error_callback_arg.command;
 	apply_error_callback_arg.command = action;
 
+	/*
+	 * Tear down pending multi-insert buffer before any non-INSERT message
+	 * to maintain correct ordering and visibility.
+	 */
+	if (action != LOGICAL_REP_MSG_INSERT && pending_multi_insert)
+		apply_multi_insert_teardown();
+
 	switch (action)
 	{
 		case LOGICAL_REP_MSG_BEGIN:
@@ -3818,7 +4183,7 @@ apply_dispatch(StringInfo s)
 			break;
 
 		case LOGICAL_REP_MSG_INSERT:
-			apply_handle_insert(s);
+			apply_handle_multi_insert(s);
 			break;
 
 		case LOGICAL_REP_MSG_UPDATE:
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 2d9dbcf4d0d..06a0674b55f 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -88,5 +88,10 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								TupleTableSlot *searchslot,
 								TupleTableSlot *remoteslot,
 								List *conflicttuples);
+extern void CheckAndReportConflict(ResultRelInfo *resultRelInfo,
+								   EState *estate, ConflictType type,
+								   List *recheckIndexes,
+								   TupleTableSlot *searchslot,
+								   TupleTableSlot *remoteslot);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
 #endif
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 058a955e20c..7f46ca5b5df 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -89,6 +89,8 @@ typedef struct LogicalRepTupleData
 	char	   *colstatus;
 	/* Length of above arrays */
 	int			ncols;
+	/* Sum of received value lengths; an estimate of in-memory tuple size */
+	Size		datasize;
 } LogicalRepTupleData;
 
 /* Possible values for LogicalRepTupleData.colstatus[colnum] */
-- 
2.47.3

Reply via email to