Tomas Vondra писал(а) 2025-12-27 16:18:
Hi Alexander,

On 12/26/25 11:54, Alexander Pyhalov wrote:
Hi.

We had some real cases when client set rather big batch_size on server
level, but for some foreign table, containing large documents, it was
inadequate and lead to OOM killer intervention. You can argue that
batch_size can be set on foreign table level, but it can still be not
flexible enough, when tuple size varies.

I agree this can be an issue with large tuples. It'd be good to consider
the size, not just the number of tuples.

I think you modified the right places, but I think there are two or
three issues we should improve:

1) It calls estimate_batch_length() for every ExecInsert() call, i.e.
for every tuples. And it walks all tuples up to that point, which makes
it O(N^2). I haven't measured how significant it is, but AFAICS we could
track the current size of the batch fairly easily, and use that.

2) work_mem is in kilobytes, while batch_len is in bytes, so the
comparison (batch_len > work_mem) is not quite right. I'll probably fire
every time, preventing any batching.

3) Isn't this consider the size of the new tuple in batch_len? Imagine
the tuples are 99% of the work_mem limit. We add the first one. When
adding the next one we check the current batch is below work_mem, and so we proceed to add the second tuple. Now the batch is 1.98% of the limit.

I think it should work like this:

1) batch_len + tup_len < work_mem => add tuple to batch
2) tup_len < work_mem => flush batch, add tuple to batch
3) tup_len => work_mem => flush batch, insert tuple directly

What bothers me a little bit is that this is per relation. AFAICS when
inserting into a partitioned table with multiple foreign partitions,
each partition will have a separate limit. I wonder if we could do
better and have some sort of "global" limit for the whole insert.

But that's not the fault of this patch, of course.

Hi. Thank you for feedback.
I've fixed the first two issues in the first patch.
I've tried to implement switching from batch to usual foreign insert (it's in the second patch). However, I don't like it. It changes the resultRelInfo->ri_NumSlots interface. Now it can be 0 when BatchInsert was followed by foreign insert, and ExecPendingInserts() should handle this. Also we can't longer keep fact that BatchInsert() was performed private to ExecInsert(), as we have to determine if estate->es_insert_pending_result_relations and estate->es_insert_pending_modifytables lists were modified between ExecInsert() calls. In theory we could look at list_member_ptr(estate->es_insert_pending_result_relations, resultRelInfo), but I don't know if iterating over this list per each tuple is a good idea.


I suppose this case is also
takes place for fetch_size. Issue here is that we can't somehow limit
size of data (versus number of rows) while fetching from cursor. But we can use tuple store to preserve fetched results, so that they spill out
to the disk.


Perhaps. Seems like a separate issue. I haven't looked very closely, but do we want to use the tuplestore always, or just when the tuples get too
large? It might even be on batch-by-batch, I guess. With fetch_size=1
it's hardly useful, right? Is the tuplestore management measurable?

I haven't noticed significant impact on simple pgbench tests. I will check different cases with fetch_size 1 - for now I don't understand if we could trigger OOM for example with async foreign scan of partitioned table with many foreign partitions - now we would have to store each tuple we get in the memory, perhaps, saving it to the store until it's needed by Append is not so bad idea.


I'm attaching two patches which try to fix issues with possible huge
memory usage by postgres_fdw batches.
With fetched tuples we still can't use only tuplestore, as ctids are not
preserved, and so have to store them separately.

The reproducer for insert is simple.

create extension postgres_fdw ;
create server loopback foreign data wrapper postgres_fdw options (dbname
'postgres', port '5432', batch_size '100', fetch_size '100');
create table base_table(i int, s bytea);
create foreign table foreign_table (i int, s bytea) server loopback
options(table_name 'base_table');
create user mapping for public server loopback ;

insert into foreign_table select i, pg_read_binary_file('/some/big/
file') from generate_series(1,1000) i;

will easily grow backend RSS to several gigabytes.
The first patch fixes this problem.

The second patch alleviates the second issue - SELECT * queries also can
grow backend memory to several GBs. Still memory usage can peak (on my
toy examples) up to 3-4 GB, but at least it seams 1-2 GB less than non-
patched version.


How large are the tuples? How much higher was this RSS than the
theoretical minimum?


Sorry, have lost the exact reproducer. Memory consumption of course depends on tuple size and fetch size.

Remade some simple tests.


select * from pg_foreign_server ;
oid | srvname | srvowner | srvfdw | srvtype | srvversion | srvacl | srvoptions
-------+----------+----------+--------+---------+------------+--------+-----------------------------------------------------------
16403 | loopback | 10 | 16392 | | | | {dbname=postgres,port=5432,batch_size=100,fetch_size=100}
(1 row)

select count(*) from foreign_table ;
 count
-------
  1100
(1 row)

 \d+ base_table
                                        Table "public.base_table"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
--------+---------+-----------+----------+---------+----------+-------------+--------------+-------------
i | integer | | | | plain | | | s | bytea | | | | extended | | |
Access method: heap

select pg_size_pretty(pg_total_relation_size('base_table')) ;
 pg_size_pretty
----------------
 5495 MB
(1 row)

postgres=# select pg_size_pretty(pg_total_relation_size('base_table')/1100) ;
 pg_size_pretty
----------------
 5116 kB
(1 row)

Backend, executing

"select  i, length(s) from foreign_table "

query takes up to 1.4 GB RSS on master and up to 1 GB on patched version.

P.S. Going to be on vacation for the first half of January.
--
Best regards,
Alexander Pyhalov,
Postgres Professional
From 7efc88ffb4d90e935ca1e17ac3d4c753a2de4ef0 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <[email protected]>
Date: Tue, 16 Dec 2025 18:46:27 +0300
Subject: [PATCH 1/3] Limit batch_size for foreign insert with work_mem

Option batch_size can be set on foreign server level. It can be very optimistic
even for one table with different tuple lengths. To prevent large memory usage
limit effective batch size with work_mem.
---
 src/backend/executor/nodeModifyTable.c | 15 ++++++++++++---
 src/include/nodes/execnodes.h          |  1 +
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 874b71e6608..a28beca63ff 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -943,12 +943,18 @@ ExecInsert(ModifyTableContext *context,
 		if (resultRelInfo->ri_BatchSize > 1)
 		{
 			bool		flushed = false;
+			Size		tuple_len;
+
+			/* Compute length of the current tuple */
+			slot_getallattrs(slot);
+			tuple_len = heap_compute_data_size(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull);
 
 			/*
-			 * When we've reached the desired batch size, perform the
-			 * insertion.
+			 * When we've reached the desired batch size or exceeded work_mem,
+			 * perform the insertion.
 			 */
-			if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize)
+			if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize ||
+				((tuple_len + resultRelInfo->ri_BatchMemoryUsed > work_mem * 1024) && (resultRelInfo->ri_NumSlots > 0)))
 			{
 				ExecBatchInsert(mtstate, resultRelInfo,
 								resultRelInfo->ri_Slots,
@@ -1019,6 +1025,8 @@ ExecInsert(ModifyTableContext *context,
 								   resultRelInfo));
 
 			resultRelInfo->ri_NumSlots++;
+			/* Batch size can grow up to tuple length even if it exceeds work_mem. */
+			resultRelInfo->ri_BatchMemoryUsed += tuple_len;
 
 			MemoryContextSwitchTo(oldContext);
 
@@ -1418,6 +1426,7 @@ ExecBatchInsert(ModifyTableState *mtstate,
 		ExecClearTuple(planSlots[i]);
 	}
 	resultRelInfo->ri_NumSlots = 0;
+	resultRelInfo->ri_BatchMemoryUsed = 0;
 }
 
 /*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3968429f991..b82ec938228 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -542,6 +542,7 @@ typedef struct ResultRelInfo
 	int			ri_NumSlots;	/* number of slots in the array */
 	int			ri_NumSlotsInitialized; /* number of initialized slots */
 	int			ri_BatchSize;	/* max slots inserted in a single batch */
+	int			ri_BatchMemoryUsed;	/* memory used by batch */
 	TupleTableSlot **ri_Slots;	/* input tuples for batch insert */
 	TupleTableSlot **ri_PlanSlots;
 
-- 
2.43.0

From f9a2ec48d907c37259fc6658a6a0ea5925143a42 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <[email protected]>
Date: Tue, 30 Dec 2025 10:35:22 +0300
Subject: [PATCH 2/3] Fall back to foreign insert when tuple is too big

When tuple is more then work_mem, we don't form batch,
but fall through to usual foreign insert. This means
that executor can form batch, insert it and finish with
a usual insert, so there will be no pending batch inserts
to process. To avoid manipulation with possibly long lists,
we don't clear es_insert_pending_result_relations and
es_insert_pending_modifytables lists, but check in
ExecPendingInserts() if there are any pending tuples.
---
 .../postgres_fdw/expected/postgres_fdw.out    |  11 ++
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   9 ++
 src/backend/executor/nodeModifyTable.c        | 138 ++++++++++--------
 src/include/nodes/execnodes.h                 |   1 +
 4 files changed, 96 insertions(+), 63 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 48e3185b227..a8bfec9413c 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -7640,6 +7640,17 @@ select count(*) from tab_batch_sharded;
     45
 (1 row)
 
+delete from tab_batch_sharded;
+-- test batch insert with large tuples and switching from batch to usual insert
+set work_mem to 64;
+insert into tab_batch_sharded select 3, case when i%4 = 0 then lpad('a',65*1024,'a') else 'test'|| i end from generate_series(1, 100) i;
+select count(*) from tab_batch_sharded;
+ count 
+-------
+   100
+(1 row)
+
+reset work_mem;
 drop table tab_batch_local;
 drop table tab_batch_sharded;
 drop table tab_batch_sharded_p1_remote;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9a8f9e28135..467943ca32e 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1936,10 +1936,19 @@ create foreign table tab_batch_sharded_p1 partition of tab_batch_sharded
   server loopback options (table_name 'tab_batch_sharded_p1_remote');
 insert into tab_batch_sharded select * from tab_batch_local;
 select count(*) from tab_batch_sharded;
+delete from tab_batch_sharded;
+-- test batch insert with large tuples and switching from batch to usual insert
+set work_mem to 64;
+insert into tab_batch_sharded select 3, case when i%4 = 0 then lpad('a',65*1024,'a') else 'test'|| i end from generate_series(1, 100) i;
+select count(*) from tab_batch_sharded;
+reset work_mem;
+
 drop table tab_batch_local;
 drop table tab_batch_sharded;
 drop table tab_batch_sharded_p1_remote;
 
+
+
 alter server loopback options (drop batch_size);
 
 -- ===================================================================
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index a28beca63ff..2e6dcdff7bf 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -942,7 +942,6 @@ ExecInsert(ModifyTableContext *context,
 		 */
 		if (resultRelInfo->ri_BatchSize > 1)
 		{
-			bool		flushed = false;
 			Size		tuple_len;
 
 			/* Compute length of the current tuple */
@@ -961,76 +960,82 @@ ExecInsert(ModifyTableContext *context,
 								resultRelInfo->ri_PlanSlots,
 								resultRelInfo->ri_NumSlots,
 								estate, canSetTag);
-				flushed = true;
 			}
 
-			oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
-
-			if (resultRelInfo->ri_Slots == NULL)
+			if (tuple_len < work_mem * 1024)
 			{
-				resultRelInfo->ri_Slots = palloc_array(TupleTableSlot *, resultRelInfo->ri_BatchSize);
-				resultRelInfo->ri_PlanSlots = palloc_array(TupleTableSlot *, resultRelInfo->ri_BatchSize);
-			}
+				oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
 
-			/*
-			 * Initialize the batch slots. We don't know how many slots will
-			 * be needed, so we initialize them as the batch grows, and we
-			 * keep them across batches. To mitigate an inefficiency in how
-			 * resource owner handles objects with many references (as with
-			 * many slots all referencing the same tuple descriptor) we copy
-			 * the appropriate tuple descriptor for each slot.
-			 */
-			if (resultRelInfo->ri_NumSlots >= resultRelInfo->ri_NumSlotsInitialized)
-			{
-				TupleDesc	tdesc = CreateTupleDescCopy(slot->tts_tupleDescriptor);
-				TupleDesc	plan_tdesc =
-					CreateTupleDescCopy(planSlot->tts_tupleDescriptor);
+				if (resultRelInfo->ri_Slots == NULL)
+				{
+					resultRelInfo->ri_Slots = palloc_array(TupleTableSlot *, resultRelInfo->ri_BatchSize);
+					resultRelInfo->ri_PlanSlots = palloc_array(TupleTableSlot *, resultRelInfo->ri_BatchSize);
+				}
 
-				resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
-					MakeSingleTupleTableSlot(tdesc, slot->tts_ops);
+				/*
+				 * Initialize the batch slots. We don't know how many slots
+				 * will be needed, so we initialize them as the batch grows,
+				 * and we keep them across batches. To mitigate an
+				 * inefficiency in how resource owner handles objects with
+				 * many references (as with many slots all referencing the
+				 * same tuple descriptor) we copy the appropriate tuple
+				 * descriptor for each slot.
+				 */
+				if (resultRelInfo->ri_NumSlots >= resultRelInfo->ri_NumSlotsInitialized)
+				{
+					TupleDesc	tdesc = CreateTupleDescCopy(slot->tts_tupleDescriptor);
+					TupleDesc	plan_tdesc =
+						CreateTupleDescCopy(planSlot->tts_tupleDescriptor);
 
-				resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
-					MakeSingleTupleTableSlot(plan_tdesc, planSlot->tts_ops);
+					resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
+						MakeSingleTupleTableSlot(tdesc, slot->tts_ops);
 
-				/* remember how many batch slots we initialized */
-				resultRelInfo->ri_NumSlotsInitialized++;
-			}
+					resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
+						MakeSingleTupleTableSlot(plan_tdesc, planSlot->tts_ops);
 
-			ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
-						 slot);
+					/* remember how many batch slots we initialized */
+					resultRelInfo->ri_NumSlotsInitialized++;
+				}
 
-			ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
-						 planSlot);
+				ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+							 slot);
 
-			/*
-			 * If these are the first tuples stored in the buffers, add the
-			 * target rel and the mtstate to the
-			 * es_insert_pending_result_relations and
-			 * es_insert_pending_modifytables lists respectively, except in
-			 * the case where flushing was done above, in which case they
-			 * would already have been added to the lists, so no need to do
-			 * this.
-			 */
-			if (resultRelInfo->ri_NumSlots == 0 && !flushed)
-			{
-				Assert(!list_member_ptr(estate->es_insert_pending_result_relations,
-										resultRelInfo));
-				estate->es_insert_pending_result_relations =
-					lappend(estate->es_insert_pending_result_relations,
-							resultRelInfo);
-				estate->es_insert_pending_modifytables =
-					lappend(estate->es_insert_pending_modifytables, mtstate);
-			}
-			Assert(list_member_ptr(estate->es_insert_pending_result_relations,
-								   resultRelInfo));
+				ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+							 planSlot);
 
-			resultRelInfo->ri_NumSlots++;
-			/* Batch size can grow up to tuple length even if it exceeds work_mem. */
-			resultRelInfo->ri_BatchMemoryUsed += tuple_len;
+				/*
+				 * If these are the first tuples stored in the buffers, add
+				 * the target rel and the mtstate to the
+				 * es_insert_pending_result_relations and
+				 * es_insert_pending_modifytables lists respectively, except
+				 * in the case where flushing was done above, in which case
+				 * they would already have been added to the lists, so no need
+				 * to do this.
+				 */
+				if (resultRelInfo->ri_NumSlots == 0 && !resultRelInfo->ri_ExecutorPendingStateModified)
+				{
+					Assert(!list_member_ptr(estate->es_insert_pending_result_relations,
+											resultRelInfo));
+					estate->es_insert_pending_result_relations =
+						lappend(estate->es_insert_pending_result_relations,
+								resultRelInfo);
+					estate->es_insert_pending_modifytables =
+						lappend(estate->es_insert_pending_modifytables, mtstate);
+					resultRelInfo->ri_ExecutorPendingStateModified = true;
+				}
+				Assert(list_member_ptr(estate->es_insert_pending_result_relations,
+									   resultRelInfo));
 
-			MemoryContextSwitchTo(oldContext);
+				resultRelInfo->ri_NumSlots++;
+				resultRelInfo->ri_BatchMemoryUsed += tuple_len;
+				/* We've flushed batch if it is too big. */
+				Assert(resultRelInfo->ri_BatchMemoryUsed < work_mem * 1024);
 
-			return NULL;
+				MemoryContextSwitchTo(oldContext);
+
+				return NULL;
+			}
+			/* else do usual foreign insert */
 		}
 
 		/*
@@ -1445,11 +1450,18 @@ ExecPendingInserts(EState *estate)
 		ModifyTableState *mtstate = (ModifyTableState *) lfirst(l2);
 
 		Assert(mtstate);
-		ExecBatchInsert(mtstate, resultRelInfo,
-						resultRelInfo->ri_Slots,
-						resultRelInfo->ri_PlanSlots,
-						resultRelInfo->ri_NumSlots,
-						estate, mtstate->canSetTag);
+
+		/*
+		 * Batch insert could switch to non-batched insert, in this case we
+		 * could have no filled slots.
+		 */
+		if (resultRelInfo->ri_NumSlots > 0)
+			ExecBatchInsert(mtstate, resultRelInfo,
+							resultRelInfo->ri_Slots,
+							resultRelInfo->ri_PlanSlots,
+							resultRelInfo->ri_NumSlots,
+							estate, mtstate->canSetTag);
+		resultRelInfo->ri_ExecutorPendingStateModified = false;
 	}
 
 	list_free(estate->es_insert_pending_result_relations);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index b82ec938228..dcc23774a4c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -543,6 +543,7 @@ typedef struct ResultRelInfo
 	int			ri_NumSlotsInitialized; /* number of initialized slots */
 	int			ri_BatchSize;	/* max slots inserted in a single batch */
 	int			ri_BatchMemoryUsed;	/* memory used by batch */
+	bool			ri_ExecutorPendingStateModified; /* true if member of estate->es_insert_pending_result_relations */
 	TupleTableSlot **ri_Slots;	/* input tuples for batch insert */
 	TupleTableSlot **ri_PlanSlots;
 
-- 
2.43.0

From b26a601207efdf575e08239bffaf94e8916df0e4 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <[email protected]>
Date: Thu, 18 Dec 2025 09:17:42 +0300
Subject: [PATCH 3/3] Use tuplestore in PgFdwScanState scan state to limit
 memory usage

A tuplestore doesn't preserve ctids, so we need to store them
separately.
---
 contrib/postgres_fdw/postgres_fdw.c | 74 +++++++++++++++++++++++++----
 1 file changed, 66 insertions(+), 8 deletions(-)

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 5e178c21b39..389e906a5d5 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -158,8 +158,12 @@ typedef struct PgFdwScanState
 	const char **param_values;	/* textual values of query parameters */
 
 	/* for storing result tuples */
-	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
-	int			num_tuples;		/* # of tuples in array */
+	Tuplestorestate *result_store;	/* currently-retrieved tuples */
+	ItemPointerData *ctids;		/* separate store for ctids */
+
+	TupleTableSlot *tupstore_slot;	/* slot needed for retrieving tuples from
+									 * tuplestore */
+	int			num_tuples;		/* # of tuples in tuple store */
 	int			next_tuple;		/* index of next one to return */
 
 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
@@ -546,6 +550,8 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int	get_batch_size_option(Relation rel);
 
+static void clean_scan_state_result_store(PgFdwScanState *fsstate);
+
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -1605,6 +1611,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 {
 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+	HeapTuple	newtup;
 
 	/*
 	 * In sync mode, if this is the first call after Begin or ReScan, we need
@@ -1631,12 +1638,31 @@ postgresIterateForeignScan(ForeignScanState *node)
 			return ExecClearTuple(slot);
 	}
 
+	if (!tuplestore_gettupleslot(fsstate->result_store, true, true,
+								 fsstate->tupstore_slot))
+	{
+		/*
+		 * We've checked that next_tuple is less than fsstate->num_tuples, so
+		 * there should be some result
+		 */
+		ereport(ERROR,
+				(errcode(ERRCODE_INTERNAL_ERROR),
+				 errmsg("unexpected end of store")));
+
+	}
+
+	newtup = ExecFetchSlotHeapTuple(fsstate->tupstore_slot, true, NULL);
+	/* Tuple store doesn't preserve ctid, so restore it separately */
+	newtup->t_self = newtup->t_data->t_ctid = fsstate->ctids[fsstate->next_tuple];
+	ExecClearTuple(fsstate->tupstore_slot);
+
 	/*
 	 * Return the next tuple.
 	 */
-	ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
+	ExecStoreHeapTuple(newtup,
 					   slot,
 					   false);
+	fsstate->next_tuple++;
 
 	return slot;
 }
@@ -1699,6 +1725,8 @@ postgresReScanForeignScan(ForeignScanState *node)
 	{
 		/* Easy: just rescan what we already have in memory, if anything */
 		fsstate->next_tuple = 0;
+		if (fsstate->result_store)
+			tuplestore_rescan(fsstate->result_store);
 		return;
 	}
 
@@ -1708,7 +1736,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	PQclear(res);
 
 	/* Now force a fresh FETCH. */
-	fsstate->tuples = NULL;
+	clean_scan_state_result_store(fsstate);
 	fsstate->num_tuples = 0;
 	fsstate->next_tuple = 0;
 	fsstate->fetch_ct_2 = 0;
@@ -1737,6 +1765,8 @@ postgresEndForeignScan(ForeignScanState *node)
 	ReleaseConnection(fsstate->conn);
 	fsstate->conn = NULL;
 
+	clean_scan_state_result_store(fsstate);
+
 	/* MemoryContexts will be deleted automatically. */
 }
 
@@ -3781,7 +3811,8 @@ create_cursor(ForeignScanState *node)
 
 	/* Mark the cursor as created, and show no tuples have been retrieved */
 	fsstate->cursor_exists = true;
-	fsstate->tuples = NULL;
+	clean_scan_state_result_store(fsstate);
+
 	fsstate->num_tuples = 0;
 	fsstate->next_tuple = 0;
 	fsstate->fetch_ct_2 = 0;
@@ -3808,7 +3839,8 @@ fetch_more_data(ForeignScanState *node)
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
 	 * batch.
 	 */
-	fsstate->tuples = NULL;
+	clean_scan_state_result_store(fsstate);
+
 	MemoryContextReset(fsstate->batch_cxt);
 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
@@ -3844,21 +3876,28 @@ fetch_more_data(ForeignScanState *node)
 
 	/* Convert the data into HeapTuples */
 	numrows = PQntuples(res);
-	fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+	fsstate->result_store = tuplestore_begin_heap(true, false, work_mem);
+	fsstate->tupstore_slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
 	fsstate->num_tuples = numrows;
 	fsstate->next_tuple = 0;
+	fsstate->ctids = palloc0(numrows * sizeof(ItemPointerData));
 
 	for (i = 0; i < numrows; i++)
 	{
+		HeapTuple	newtup;
+
 		Assert(IsA(node->ss.ps.plan, ForeignScan));
 
-		fsstate->tuples[i] =
+		newtup =
 			make_tuple_from_result_row(res, i,
 									   fsstate->rel,
 									   fsstate->attinmeta,
 									   fsstate->retrieved_attrs,
 									   node,
 									   fsstate->temp_cxt);
+		tuplestore_puttuple(fsstate->result_store, newtup);
+		ItemPointerCopy(&newtup->t_self, &fsstate->ctids[i]);
+		heap_freetuple(newtup);
 	}
 
 	/* Update fetch_ct_2 */
@@ -7636,6 +7675,25 @@ make_tuple_from_result_row(PGresult *res,
 	return tuple;
 }
 
+/*
+ * Clean PgFdwScanState result store
+ */
+static void
+clean_scan_state_result_store(PgFdwScanState *fsstate)
+{
+	if (fsstate->result_store)
+	{
+		/*
+		 * We partially rely on the fact that batch_cxt is also reset
+		 */
+		tuplestore_end(fsstate->result_store);
+		ExecDropSingleTupleTableSlot(fsstate->tupstore_slot);
+		fsstate->result_store = NULL;
+		fsstate->tupstore_slot = NULL;
+		fsstate->ctids = NULL;
+	}
+}
+
 /*
  * Callback function which is called when error occurs during column value
  * conversion.  Print names of column and relation.
-- 
2.43.0

Reply via email to