From 5630836aefb87948bb745d7faad01e9e3534a64c Mon Sep 17 00:00:00 2001
From: Amit Langote <amitlan@postgresql.org>
Date: Sat, 20 Dec 2025 17:23:12 +0900
Subject: [PATCH v4 2/3] SeqScan: add batch-driven variants returning slots

Teach SeqScan to drive the table AM via new the batch API added in
the previous commit, while still returning one TupleTableSlot at a
time to callers. This reduces per tuple AM crossings without
changing the node interface seen by parents.

Add TupleBatch and supporting code in execBatch.c/h to hold executor
side batching state. PlanState gains ps_Batch to carry the active
TupleBatch when a node supports batching.

Wire up runtime selection in ExecInitSeqScan using
ScanCanUseBatching(). When executor_batching is enabled, EPQ is
inactive, the scan is not backward, and the relation supports
batching, ps.ExecProcNode is set to a batch-driven variant. Otherwise
the non-batch path is used.

Plan shape and EXPLAIN output remain unchanged; only the internal
tuple flow differs when batching is enabled and allowed.

Add executor_batch_rows GUC to specify the maximum number of rows
that can be added into a batch.

Notes / current limits:

- With the current heapam, batches are composed from a single page, so
  the batch may not always be full. Future work may let SeqScan and/or
  AMs top up batches across pages when safe to do so.

Reviewed-by: Daniil Davydov <3danissimo@gmail.com>
Discussion: https://postgr.es/m/CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com
---
 src/backend/access/heap/heapam.c          |  29 ++++
 src/backend/access/heap/heapam_handler.c  |  16 ++
 src/backend/access/table/tableam.c        |  11 ++
 src/backend/executor/Makefile             |   1 +
 src/backend/executor/execBatch.c          | 117 ++++++++++++++
 src/backend/executor/execScan.c           |  31 ++++
 src/backend/executor/meson.build          |   1 +
 src/backend/executor/nodeSeqscan.c        | 176 +++++++++++++++++++++-
 src/backend/utils/init/globals.c          |   3 +
 src/backend/utils/misc/guc_parameters.dat |   9 ++
 src/include/access/heapam.h               |   1 +
 src/include/access/tableam.h              |  27 ++++
 src/include/executor/execBatch.h          |  99 ++++++++++++
 src/include/executor/execScan.h           |  69 +++++++++
 src/include/executor/executor.h           |   4 +
 src/include/miscadmin.h                   |   1 +
 src/include/nodes/execnodes.h             |   4 +
 17 files changed, 598 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/executor/execBatch.c
 create mode 100644 src/include/executor/execBatch.h

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index fcc0813f139..0c0b2384f0e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1592,6 +1592,35 @@ heap_begin_batch(TableScanDesc sscan, int maxitems)
 	return hb;
 }
 
+/*
+ * heap_scan_materialize_all
+ *
+ * Bind all tuples of the current batch into 'slots'. We bind the
+ * HeapTupleData header that points into the pinned page. No per-row copy.
+ */
+void
+heap_materialize_batch_all(void *am_batch, TupleTableSlot **slots, int n)
+{
+	HeapBatch *hb = (HeapBatch *) am_batch;
+
+	Assert(n <= hb->nitems);
+
+	for (int i = 0; i < n; i++)
+	{
+		HeapTupleData *tuple = &hb->tupdata[i];
+		HeapTupleTableSlot *slot = (HeapTupleTableSlot *) slots[i];
+
+		/* Inline of ExecStoreHeapTuple(tuple, slot, false) */
+		slot->tuple = tuple;
+		slot->off = 0;
+		slot->base.tts_nvalid = 0;
+		slot->base.tts_flags &= ~(TTS_FLAG_EMPTY | TTS_FLAG_SHOULDFREE);
+		slot->base.tts_tid = tuple->t_self;
+		slot->base.tts_tableOid = tuple->t_tableOid;
+		slot->base.tts_flags &= ~(TTS_FLAG_SHOULDFREE | TTS_FLAG_EMPTY);
+	}
+}
+
 /*
  * heap_scan_end_batch
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 550b788553c..a4de7e5b4f5 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -72,6 +72,21 @@ heapam_slot_callbacks(Relation relation)
 	return &TTSOpsBufferHeapTuple;
 }
 
+/* ------------------------------------------------------------------------
+ * TupleBatch related callbacks for heap AM
+ * ------------------------------------------------------------------------
+ */
+
+static const TupleBatchOps TupleBatchHeapOps =
+{
+	.materialize_all = heap_materialize_batch_all
+};
+
+static const TupleBatchOps *
+heapam_batch_callbacks(Relation relation)
+{
+	return &TupleBatchHeapOps;
+}
 
 /* ------------------------------------------------------------------------
  * Index Scan Callbacks for heap AM
@@ -2617,6 +2632,7 @@ static const TableAmRoutine heapam_methods = {
 	.type = T_TableAmRoutine,
 
 	.slot_callbacks = heapam_slot_callbacks,
+	.batch_callbacks = heapam_batch_callbacks,
 
 	.scan_begin = heap_beginscan,
 	.scan_end = heap_endscan,
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 73ebc01a08f..d281aacaf94 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -103,6 +103,17 @@ table_slot_create(Relation relation, List **reglist)
 	return slot;
 }
 
+/* ----------------------------------------------------------------------------
+ * TupleBatch support routines
+ * ----------------------------------------------------------------------------
+ */
+const TupleBatchOps *
+table_batch_callbacks(Relation relation)
+{
+	if (relation->rd_tableam)
+		return relation->rd_tableam->batch_callbacks(relation);
+	elog(ERROR, "relation does not support TupleBatch operations");
+}
 
 /* ----------------------------------------------------------------------------
  * Table scan functions.
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 11118d0ce02..3e72f3fe03c 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
 	execAmi.o \
 	execAsync.o \
+	execBatch.o \
 	execCurrent.o \
 	execExpr.o \
 	execExprInterp.o \
diff --git a/src/backend/executor/execBatch.c b/src/backend/executor/execBatch.c
new file mode 100644
index 00000000000..007ae535687
--- /dev/null
+++ b/src/backend/executor/execBatch.c
@@ -0,0 +1,117 @@
+/*-------------------------------------------------------------------------
+ *
+ * execBatch.c
+ *		Helpers for TupleBatch
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execBatch.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "executor/execBatch.h"
+
+/*
+ * TupleBatchCreate
+ *		Allocate and initialize a new TupleBatch envelope.
+ */
+TupleBatch *
+TupleBatchCreate(TupleDesc scandesc, int capacity)
+{
+	TupleBatch  *b;
+	TupleTableSlot **inslots,
+				   **outslots;
+
+	inslots = palloc(sizeof(TupleTableSlot *) * capacity);
+	outslots = palloc(sizeof(TupleTableSlot *) * capacity);
+	for (int i = 0; i < capacity; i++)
+		inslots[i] = MakeSingleTupleTableSlot(scandesc, &TTSOpsHeapTuple);
+
+	b = (TupleBatch *) palloc(sizeof(TupleBatch));
+
+	/* Initial state: empty envelope */
+	b->am_payload = NULL;
+	b->ntuples = 0;
+	b->inslots = inslots;
+	b->outslots = outslots;
+	b->activeslots = NULL;
+	b->outslots = outslots;
+	b->maxslots = capacity;
+
+	b->nvalid = 0;
+	b->next = 0;
+
+	return b;
+}
+
+/*
+ * TupleBatchReset
+ *		Reset an existing TupleBatch envelope to empty.
+ */
+void
+TupleBatchReset(TupleBatch *b, bool drop_slots)
+{
+	if (b == NULL)
+		return;
+
+	for (int i = 0; i < b->maxslots; i++)
+	{
+		ExecClearTuple(b->inslots[i]);
+		if (drop_slots)
+			ExecDropSingleTupleTableSlot(b->inslots[i]);
+	}
+
+	if (drop_slots)
+	{
+		pfree(b->inslots);
+		pfree(b->outslots);
+		b->inslots = b->outslots = NULL;
+	}
+
+	b->ntuples = 0;
+	b->nvalid = 0;
+	b->next = 0;
+	b->activeslots = NULL;
+}
+
+void
+TupleBatchUseInput(TupleBatch *b, int nvalid)
+{
+	b->materialized = true;
+	b->activeslots = b->inslots;
+	b->nvalid = nvalid;
+	b->next = 0;
+}
+
+void
+TupleBatchUseOutput(TupleBatch *b, int nvalid)
+{
+	b->materialized = true;
+	b->activeslots = b->outslots;
+	b->nvalid = nvalid;
+	b->next = 0;
+}
+
+bool
+TupleBatchIsValid(TupleBatch *b)
+{
+	return	b != NULL &&
+			b->maxslots > 0 &&
+			b->inslots != NULL &&
+			b->outslots != NULL;
+}
+
+void
+TupleBatchRewind(TupleBatch *b)
+{
+	b->next = 0;
+}
+
+int
+TupleBatchGetNumValid(TupleBatch *b)
+{
+	return b->nvalid;
+}
diff --git a/src/backend/executor/execScan.c b/src/backend/executor/execScan.c
index 31ed4783c1d..ba25daa5e46 100644
--- a/src/backend/executor/execScan.c
+++ b/src/backend/executor/execScan.c
@@ -18,6 +18,7 @@
  */
 #include "postgres.h"
 
+#include "access/tableam.h"
 #include "executor/executor.h"
 #include "executor/execScan.h"
 #include "miscadmin.h"
@@ -154,3 +155,33 @@ ExecScanReScan(ScanState *node)
 		}
 	}
 }
+
+bool
+ScanCanUseBatching(ScanState *scanstate, int eflags)
+{
+	Relation	relation = scanstate->ss_currentRelation;
+
+	return	executor_batch_rows > 0 &&
+			(scanstate->ps.state->es_epq_active == NULL) &&
+			!(eflags & EXEC_FLAG_BACKWARD) &&
+			relation && table_supports_batching(relation);
+}
+
+void
+ScanResetBatching(ScanState *scanstate, bool drop)
+{
+	TupleBatch *b = scanstate->ps.ps_Batch;
+
+	if (b)
+	{
+		TupleBatchReset(b, drop);
+		if (b->am_payload)
+		{
+			table_scan_end_batch(scanstate->ss_currentScanDesc,
+								 b->am_payload);
+			b->am_payload = NULL;
+		}
+		if (drop)
+			pfree(b);
+	}
+}
diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build
index 2cea41f8771..40ffc28f3cb 100644
--- a/src/backend/executor/meson.build
+++ b/src/backend/executor/meson.build
@@ -3,6 +3,7 @@
 backend_sources += files(
   'execAmi.c',
   'execAsync.c',
+  'execBatch.c',
   'execCurrent.c',
   'execExpr.c',
   'execExprInterp.c',
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 94047d29430..a9071e32560 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -203,6 +203,171 @@ ExecSeqScanEPQ(PlanState *pstate)
 					(ExecScanRecheckMtd) SeqRecheck);
 }
 
+/* ----------------------------------------------------------------
+ *						Batch Support
+ * ----------------------------------------------------------------
+ */
+static inline bool
+SeqNextBatch(SeqScanState *node)
+{
+	TableScanDesc scandesc;
+	EState	   *estate;
+	ScanDirection direction;
+
+	Assert(node->ss.ps.ps_Batch != NULL);
+
+	/*
+	 * get information from the estate and scan state
+	 */
+	scandesc = node->ss.ss_currentScanDesc;
+	estate = node->ss.ps.state;
+	direction = estate->es_direction;
+	Assert(direction == ForwardScanDirection);
+
+	if (scandesc == NULL)
+	{
+		/*
+		 * We reach here if the scan is not parallel, or if we're serially
+		 * executing a scan that was planned to be parallel.
+		 */
+		scandesc = table_beginscan(node->ss.ss_currentRelation,
+								   estate->es_snapshot,
+								   0, NULL);
+		node->ss.ss_currentScanDesc = scandesc;
+	}
+
+	/* Lazily create the AM batch payload. */
+	if (node->ss.ps.ps_Batch->am_payload == NULL)
+	{
+		const TableAmRoutine *tam PG_USED_FOR_ASSERTS_ONLY = scandesc->rs_rd->rd_tableam;
+
+		Assert(tam && tam->scan_begin_batch);
+		node->ss.ps.ps_Batch->am_payload =
+			table_scan_begin_batch(scandesc, node->ss.ps.ps_Batch->maxslots);
+		node->ss.ps.ps_Batch->ops = table_batch_callbacks(node->ss.ss_currentRelation);
+	}
+
+	node->ss.ps.ps_Batch->ntuples =
+		table_scan_getnextbatch(scandesc, node->ss.ps.ps_Batch->am_payload, direction);
+	node->ss.ps.ps_Batch->nvalid = node->ss.ps.ps_Batch->ntuples;
+	node->ss.ps.ps_Batch->materialized = false;
+
+	return node->ss.ps.ps_Batch->ntuples > 0;
+}
+
+static inline bool
+SeqNextBatchMaterialize(SeqScanState *node)
+{
+	if (SeqNextBatch(node))
+	{
+		TupleBatchMaterializeAll(node->ss.ps.ps_Batch);
+		return true;
+	}
+
+	return false;
+}
+
+static TupleTableSlot *
+ExecSeqScanBatchSlot(PlanState *pstate)
+{
+	SeqScanState *node = castNode(SeqScanState, pstate);
+
+	Assert(pstate->state->es_epq_active == NULL);
+	Assert(pstate->qual == NULL);
+	Assert(pstate->ps_ProjInfo == NULL);
+
+	return ExecScanExtendedBatchSlot(&node->ss,
+									 (ExecScanAccessBatchMtd) SeqNextBatchMaterialize,
+									 NULL, NULL);
+}
+
+static TupleTableSlot *
+ExecSeqScanBatchSlotWithQual(PlanState *pstate)
+{
+	SeqScanState *node = castNode(SeqScanState, pstate);
+
+	/*
+	 * Use pg_assume() for != NULL tests to make the compiler realize no
+	 * runtime check for the field is needed in ExecScanExtended().
+	 */
+	Assert(pstate->state->es_epq_active == NULL);
+	pg_assume(pstate->qual != NULL);
+	Assert(pstate->ps_ProjInfo == NULL);
+
+	return ExecScanExtendedBatchSlot(&node->ss,
+									 (ExecScanAccessBatchMtd) SeqNextBatchMaterialize,
+									 pstate->qual, NULL);
+}
+
+/*
+ * Variant of ExecSeqScan() but when projection is required.
+ */
+static TupleTableSlot *
+ExecSeqScanBatchSlotWithProject(PlanState *pstate)
+{
+	SeqScanState *node = castNode(SeqScanState, pstate);
+
+	Assert(pstate->state->es_epq_active == NULL);
+	Assert(pstate->qual == NULL);
+	pg_assume(pstate->ps_ProjInfo != NULL);
+
+	return ExecScanExtendedBatchSlot(&node->ss,
+									 (ExecScanAccessBatchMtd) SeqNextBatchMaterialize,
+									 NULL, pstate->ps_ProjInfo);
+}
+
+/*
+ * Variant of ExecSeqScan() but when qual evaluation and projection are
+ * required.
+ */
+static TupleTableSlot *
+ExecSeqScanBatchSlotWithQualProject(PlanState *pstate)
+{
+	SeqScanState *node = castNode(SeqScanState, pstate);
+
+	Assert(pstate->state->es_epq_active == NULL);
+	pg_assume(pstate->qual != NULL);
+	pg_assume(pstate->ps_ProjInfo != NULL);
+
+	return ExecScanExtendedBatchSlot(&node->ss,
+									 (ExecScanAccessBatchMtd) SeqNextBatchMaterialize,
+									 pstate->qual, pstate->ps_ProjInfo);
+}
+
+/* Batch SeqScan enablement and dispatch */
+static void
+SeqScanInitBatching(SeqScanState *scanstate, int eflags)
+{
+	const int cap = executor_batch_rows;
+	TupleDesc	scandesc = RelationGetDescr(scanstate->ss.ss_currentRelation);
+
+	scanstate->ss.ps.ps_Batch = TupleBatchCreate(scandesc, cap);
+
+	/* Choose batch variant to preserve your specialization matrix */
+	if (scanstate->ss.ps.qual == NULL)
+	{
+		if (scanstate->ss.ps.ps_ProjInfo == NULL)
+		{
+			scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlot;
+		}
+		else
+		{
+			scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithProject;
+		}
+	}
+	else
+	{
+		if (scanstate->ss.ps.ps_ProjInfo == NULL)
+		{
+			scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQual;
+		}
+		else
+		{
+			scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQualProject;
+		}
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitSeqScan
  * ----------------------------------------------------------------
@@ -211,6 +376,7 @@ SeqScanState *
 ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 {
 	SeqScanState *scanstate;
+	bool	use_batching;
 
 	/*
 	 * Once upon a time it was possible to have an outerPlan of a SeqScan, but
@@ -241,9 +407,12 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 							 node->scan.scanrelid,
 							 eflags);
 
+	use_batching = ScanCanUseBatching(&scanstate->ss, eflags);
+
 	/* and create slot with the appropriate rowtype */
 	ExecInitScanTupleSlot(estate, &scanstate->ss,
 						  RelationGetDescr(scanstate->ss.ss_currentRelation),
+						  use_batching ? &TTSOpsHeapTuple :
 						  table_slot_callbacks(scanstate->ss.ss_currentRelation));
 
 	/*
@@ -280,6 +449,9 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 			scanstate->ss.ps.ExecProcNode = ExecSeqScanWithQualProject;
 	}
 
+	if (use_batching)
+		SeqScanInitBatching(scanstate, eflags);
+
 	return scanstate;
 }
 
@@ -299,6 +471,8 @@ ExecEndSeqScan(SeqScanState *node)
 	 */
 	scanDesc = node->ss.ss_currentScanDesc;
 
+	ScanResetBatching(&node->ss, true);
+
 	/*
 	 * close heap scan
 	 */
@@ -327,7 +501,7 @@ ExecReScanSeqScan(SeqScanState *node)
 	if (scan != NULL)
 		table_rescan(scan,		/* scan desc */
 					 NULL);		/* new scan keys */
-
+	ScanResetBatching(&node->ss, false);
 	ExecScanReScan((ScanState *) node);
 }
 
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index d31cb45a058..266502e9778 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -165,3 +165,6 @@ int			notify_buffers = 16;
 int			serializable_buffers = 32;
 int			subtransaction_buffers = 0;
 int			transaction_buffers = 0;
+
+/* executor batching */
+int			executor_batch_rows = 64;
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 3b9d8349078..fd97d26c073 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -1001,6 +1001,15 @@
   boot_val => 'true',
 },
 
+{ name => 'executor_batch_rows', type => 'int', context => 'PGC_USERSET', group => 'DEVELOPER_OPTIONS',
+  short_desc => 'Number of rows to include in batches during execution.',
+  flags => 'GUC_NOT_IN_SAMPLE',
+  variable => 'executor_batch_rows',
+  boot_val => '64',
+  min => '0',
+  max => '1024',
+},
+
 { name => 'exit_on_error', type => 'bool', context => 'PGC_USERSET', group => 'ERROR_HANDLING_OPTIONS',
   short_desc => 'Terminate session on any error.',
   variable => 'ExitOnAnyError',
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index f6675043fb3..fe07b21eaa2 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -354,6 +354,7 @@ extern bool heap_getnextslot(TableScanDesc sscan,
 extern void *heap_begin_batch(TableScanDesc sscan, int maxitems);
 extern void heap_end_batch(TableScanDesc sscan, void *am_batch);
 extern int heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir);
+extern void heap_materialize_batch_all(void *am_batch, TupleTableSlot **slots, int n);
 
 extern void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
 							  ItemPointer maxtid);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 3ec3c3dd008..13a95f7a589 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -21,6 +21,7 @@
 #include "access/sdir.h"
 #include "access/xact.h"
 #include "commands/vacuum.h"
+#include "executor/execBatch.h"
 #include "executor/tuptable.h"
 #include "storage/read_stream.h"
 #include "utils/rel.h"
@@ -39,6 +40,7 @@ typedef struct BulkInsertStateData BulkInsertStateData;
 typedef struct IndexInfo IndexInfo;
 typedef struct SampleScanState SampleScanState;
 typedef struct ValidateIndexState ValidateIndexState;
+typedef struct TupleBatchOps TupleBatchOps;
 
 /*
  * Bitmask values for the flags argument to the scan_begin callback.
@@ -301,6 +303,7 @@ typedef struct TableAmRoutine
 	 * Return slot implementation suitable for storing a tuple of this AM.
 	 */
 	const TupleTableSlotOps *(*slot_callbacks) (Relation rel);
+	const TupleBatchOps *(*batch_callbacks)(Relation rel);
 
 
 	/* ------------------------------------------------------------------------
@@ -361,6 +364,7 @@ typedef struct TableAmRoutine
 									 ScanDirection dir);
 	void		(*scan_end_batch)(TableScanDesc sscan, void *am_batch);
 
+
 	/*-----------
 	 * Optional functions to provide scanning for ranges of ItemPointers.
 	 * Implementations must either provide both of these functions, or neither
@@ -872,6 +876,16 @@ extern const TupleTableSlotOps *table_slot_callbacks(Relation relation);
  */
 extern TupleTableSlot *table_slot_create(Relation relation, List **reglist);
 
+/* ----------------------------------------------------------------------------
+ * TupleBatch functions.
+ * ----------------------------------------------------------------------------
+ */
+
+/*
+ * Returns callbacks for manipulating TupleBatch for tuples of the given
+ * relation.
+ */
+extern const TupleBatchOps *table_batch_callbacks(Relation relation);
 
 /* ----------------------------------------------------------------------------
  * Table scan functions.
@@ -1046,6 +1060,18 @@ table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableS
 	return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
 }
 
+/*
+ * table_supports_batching
+ *		Does the relation's AM support batching?
+ */
+static inline bool
+table_supports_batching(Relation relation)
+{
+	const TableAmRoutine *tam = relation->rd_tableam;
+
+	return tam->scan_getnextbatch != NULL;
+}
+
 /*
  * table_scan_begin_batch
  *		Allocate AM-owned batch payload with capacity 'maxitems'.
@@ -2128,5 +2154,6 @@ extern const TableAmRoutine *GetTableAmRoutine(Oid amhandler);
  */
 
 extern const TableAmRoutine *GetHeapamTableAmRoutine(void);
+extern struct TupleBatchOps *GetHeapamTupleBatchOps(void);
 
 #endif							/* TABLEAM_H */
diff --git a/src/include/executor/execBatch.h b/src/include/executor/execBatch.h
new file mode 100644
index 00000000000..2d0066103ce
--- /dev/null
+++ b/src/include/executor/execBatch.h
@@ -0,0 +1,99 @@
+/*-------------------------------------------------------------------------
+ *
+ * execBatch.h
+ *		Executor batch envelope for passing tuple batch state upward
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/include/executor/execBatch.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef EXECBATCH_H
+#define EXECBATCH_H
+
+#include "executor/tuptable.h"
+
+/*
+ * TupleBatchOps -- AM-specific helpers for lazy materialization.
+ */
+typedef struct TupleBatchOps
+{
+	void (*materialize_all)(void *am_payload,
+							TupleTableSlot **dst,
+							int maxslots);
+} TupleBatchOps;
+
+/*
+ * TupleBatch
+ *
+ * Envelope for a batch of tuples produced by a plan node (e.g., SeqScan) per
+ * call to a batch variant of ExecSeqScan().
+ */
+typedef struct TupleBatch
+{
+	void	   *am_payload;
+	const TupleBatchOps *ops;
+	int			ntuples;				/* number of tuples in am_payload */
+	bool		materialized;		 /* tuples in slots valid? */
+	struct TupleTableSlot **inslots; /* slots for tuples read "into" batch */
+	struct TupleTableSlot **outslots; /* slots for tuples going "out of"
+									   * batch */
+	struct TupleTableSlot **activeslots;
+	int			maxslots;
+
+	int		nvalid;		/* number of returnable tuples in outslots */
+	int		next;		/* 0-based index of next tuple to be returned */
+} TupleBatch;
+
+
+/* Helpers */
+extern TupleBatch *TupleBatchCreate(TupleDesc scandesc, int capacity);
+extern void TupleBatchReset(TupleBatch *b, bool drop_slots);
+extern void TupleBatchUseInput(TupleBatch *b, int nvalid);
+extern void TupleBatchUseOutput(TupleBatch *b, int nvalid);
+extern bool TupleBatchIsValid(TupleBatch *b);
+extern void TupleBatchRewind(TupleBatch *b);
+extern int TupleBatchGetNumValid(TupleBatch *b);
+
+static inline TupleTableSlot *
+TupleBatchGetNextSlot(TupleBatch *b)
+{
+	return b->next < b->nvalid ? b->activeslots[b->next++] : NULL;
+}
+
+static inline TupleTableSlot *
+TupleBatchGetSlot(TupleBatch *b, int index)
+{
+	Assert(index < b->nvalid);
+	return b->activeslots[index];
+}
+
+static inline void
+TupleBatchStoreInOut(TupleBatch *b, int index, TupleTableSlot *out)
+{
+	Assert(TupleBatchIsValid(b));
+	b->outslots[index] = out;
+}
+
+static inline bool
+TupleBatchHasMore(TupleBatch *b)
+{
+	return b->activeslots && b->next < b->nvalid;
+}
+
+static inline void
+TupleBatchMaterializeAll(TupleBatch *b)
+{
+	if (b->materialized)
+		return;
+
+	if (b->ops == NULL || b->ops->materialize_all == NULL)
+		elog(ERROR, "TupleBatch has no slots and no materialize_all op");
+
+	b->ops->materialize_all(b->am_payload, b->inslots, b->ntuples);
+	TupleBatchUseInput(b, b->ntuples);
+}
+
+#endif	/* EXECBATCH_H */
diff --git a/src/include/executor/execScan.h b/src/include/executor/execScan.h
index 2003cbc7ed5..c1add8ca331 100644
--- a/src/include/executor/execScan.h
+++ b/src/include/executor/execScan.h
@@ -251,4 +251,73 @@ ExecScanExtended(ScanState *node,
 	}
 }
 
+/*
+ * ExecScanExtendedBatchSlot
+ *		Batch-driven variant of ExecScanExtended.
+ *
+ * Returns one tuple at a time to callers, but internally fetches tuples
+ * in batches from the AM via accessBatchMtd. This reduces per-tuple AM
+ * call overhead while preserving the single-slot interface expected by
+ * parent nodes.
+ *
+ * The batch is refilled when exhausted by calling accessBatchMtd, which
+ * returns false at end-of-scan.
+ *
+ * Note: EPQ is not supported in the batch path; callers must ensure
+ * es_epq_active is NULL before using this function.
+ */
+static inline TupleTableSlot *
+ExecScanExtendedBatchSlot(ScanState *node,
+						  ExecScanAccessBatchMtd accessBatchMtd,
+						  ExprState *qual, ProjectionInfo *projInfo)
+{
+	ExprContext *econtext = node->ps.ps_ExprContext;
+	TupleBatch *b = node->ps.ps_Batch;
+
+	/* Batch path does not support EPQ */
+	Assert(node->ps.state->es_epq_active == NULL);
+	Assert(TupleBatchIsValid(b));
+
+	for (;;)
+	{
+		TupleTableSlot *in;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Get next input slot from current batch, or refill */
+		if (!TupleBatchHasMore(b))
+		{
+			if (!accessBatchMtd(node))
+				return NULL;
+		}
+
+		in = TupleBatchGetNextSlot(b);
+		Assert(in);
+
+		/* No qual, no projection: direct return */
+		if (qual == NULL && projInfo == NULL)
+			return in;
+
+		ResetExprContext(econtext);
+		econtext->ecxt_scantuple = in;
+
+		/* Qual only */
+		if (projInfo == NULL)
+		{
+			if (qual == NULL || ExecQual(qual, econtext))
+				return in;
+			else
+				InstrCountFiltered1(node, 1);
+			continue;
+		}
+
+		/* Projection (with or without qual) */
+		if (qual == NULL || ExecQual(qual, econtext))
+			return ExecProject(projInfo);
+		else
+			InstrCountFiltered1(node, 1);
+		/* else try next tuple */
+	}
+}
+
 #endif							/* EXECSCAN_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 7cd6a49309f..c1f05ce6273 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -578,12 +578,16 @@ extern Datum ExecMakeFunctionResultSet(SetExprState *fcache,
  */
 typedef TupleTableSlot *(*ExecScanAccessMtd) (ScanState *node);
 typedef bool (*ExecScanRecheckMtd) (ScanState *node, TupleTableSlot *slot);
+typedef bool (*ExecScanAccessBatchMtd)(ScanState *node);
 
 extern TupleTableSlot *ExecScan(ScanState *node, ExecScanAccessMtd accessMtd,
 								ExecScanRecheckMtd recheckMtd);
+
 extern void ExecAssignScanProjectionInfo(ScanState *node);
 extern void ExecAssignScanProjectionInfoWithVarno(ScanState *node, int varno);
 extern void ExecScanReScan(ScanState *node);
+extern bool ScanCanUseBatching(ScanState *scanstate, int eflags);
+extern void ScanResetBatching(ScanState *scanstate, bool drop);
 
 /*
  * prototypes from functions in execTuples.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 9a7d733ddef..13285210998 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -288,6 +288,7 @@ extern PGDLLIMPORT double VacuumCostDelay;
 extern PGDLLIMPORT int VacuumCostBalance;
 extern PGDLLIMPORT bool VacuumCostActive;
 
+extern PGDLLIMPORT int executor_batch_rows;
 
 /* in utils/misc/stack_depth.c */
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3968429f991..219a722c49a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -30,6 +30,7 @@
 #define EXECNODES_H
 
 #include "access/tupconvert.h"
+#include "executor/execBatch.h"
 #include "executor/instrument.h"
 #include "fmgr.h"
 #include "lib/ilist.h"
@@ -1204,6 +1205,9 @@ typedef struct PlanState
 	ExprContext *ps_ExprContext;	/* node's expression-evaluation context */
 	ProjectionInfo *ps_ProjInfo;	/* info for doing tuple projection */
 
+	/* Batching state if node supports it. */
+	TupleBatch *ps_Batch;
+
 	bool		async_capable;	/* true if node is async-capable */
 
 	/*
-- 
2.47.3

