From 14e2fcb457ef3e9a3ad004425ec7e334a3619fd7 Mon Sep 17 00:00:00 2001
From: Rafia Sabih <rafia.sabih@cybertec.at>
Date: Mon, 6 Jan 2025 09:21:18 +0100
Subject: [PATCH] Add a fetch mechanism without cursors

This adds a GUC to enable/ disable cursor mode in postgres_fdw.
The GUC is called postgres_fdw.use_cursor. When it is set, everything
works as it is now. However, there is a limitation to the current
mechanism, it is unable to use parallel plans at local side because
of the use of cursors. Now, if a user wants to overcome this, then
one can unset the abovementioned GUC. In non-cursor mode cursors are
not used and hence the parallel plans can be used at the local side.
In non-cursor mode fetch_size is used to as is.

A caveat with the non-cursor mode is that when simultaneous queries are
fired at the local side, i.e. more than one cursor is opened at a time,
then we use Tuplestore, so there might be some memory related performance
degradation only in those cases.
---
 contrib/postgres_fdw/connection.c   |   7 +
 contrib/postgres_fdw/option.c       |  17 +++
 contrib/postgres_fdw/postgres_fdw.c | 221 +++++++++++++++++++++-------
 contrib/postgres_fdw/postgres_fdw.h |   2 +
 4 files changed, 193 insertions(+), 54 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 2326f391d34..95e30773a19 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -868,6 +868,13 @@ pgfdw_get_result(PGconn *conn)
 	return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
 }
 
+PGresult *
+pgfdw_get_next_result(PGconn *conn)
+{
+	return libpqsrv_get_result(conn, pgfdw_we_get_result);
+}
+
+
 /*
  * Report an error we got from the remote server.
  *
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 232d85354b2..a5d7b747536 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -49,6 +49,7 @@ static PQconninfoOption *libpq_options;
  * GUC parameters
  */
 char	   *pgfdw_application_name = NULL;
+bool pgfdw_use_cursor = true;
 
 /*
  * Helper functions
@@ -585,5 +586,21 @@ _PG_init(void)
 							   NULL,
 							   NULL);
 
+	/*
+	 * If use_cursor is set to false, then the new way of fetching is used, which allows for the
+	 * use of parallel plans at the local side. In the cursor mode, parallel plans could not be
+	 * used.
+	 */
+	DefineCustomBoolVariable("postgres_fdw.use_cursor",
+							"If set uses the cursor, otherwise fetches without cursor",
+							NULL,
+							&pgfdw_use_cursor,
+							true,
+							PGC_USERSET,
+							0,
+							NULL,
+							NULL,
+							NULL);
+
 	MarkGUCPrefixReserved("postgres_fdw");
 }
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index cf564341184..06407da60ef 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "executor/execAsync.h"
+#include "executor/executor.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -542,6 +543,7 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int	get_batch_size_option(Relation rel);
 
+static bool  only_query = true;      /* Only to be used in the non cursor mode*/
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -1544,6 +1546,11 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	/* Get private info created by planner functions. */
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
+
+	/* We need to remember that there is already a query running. */
+	if (fsstate->cursor_number >= 2)
+		only_query = false;
+
 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
 												 FdwScanPrivateRetrievedAttrs);
 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
@@ -3733,7 +3740,7 @@ create_cursor(ForeignScanState *node)
 	const char **values = fsstate->param_values;
 	PGconn	   *conn = fsstate->conn;
 	StringInfoData buf;
-	PGresult   *res;
+	PGresult   *res = NULL;
 
 	/* First, process a pending asynchronous request, if any. */
 	if (fsstate->conn_state->pendingAreq)
@@ -3758,36 +3765,53 @@ create_cursor(ForeignScanState *node)
 		MemoryContextSwitchTo(oldcontext);
 	}
 
-	/* Construct the DECLARE CURSOR command */
 	initStringInfo(&buf);
-	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
-					 fsstate->cursor_number, fsstate->query);
 
-	/*
-	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
-	 * to infer types for all parameters.  Since we explicitly cast every
-	 * parameter (see deparse.c), the "inference" is trivial and will produce
-	 * the desired result.  This allows us to avoid assuming that the remote
-	 * server has the same OIDs we do for the parameters' types.
-	 */
-	if (!PQsendQueryParams(conn, buf.data, numParams,
-						   NULL, values, NULL, NULL, 0))
-		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+	if (pgfdw_use_cursor)
+	{
+		/* Construct the DECLARE CURSOR command */
+		appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
+						fsstate->cursor_number, fsstate->query);
+
+		/*
+		* Notice that we pass NULL for paramTypes, thus forcing the remote server
+		* to infer types for all parameters.  Since we explicitly cast every
+		* parameter (see deparse.c), the "inference" is trivial and will produce
+		* the desired result.  This allows us to avoid assuming that the remote
+		* server has the same OIDs we do for the parameters' types.
+		*/
+		if (!PQsendQueryParams(conn, buf.data, numParams,
+							NULL, values, NULL, NULL, 0))
+			pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+		/*
+		* Get the result, and check for success.
+		*
+		* We don't use a PG_TRY block here, so be careful not to throw error
+		* without releasing the PGresult.
+		*/
+		res = pgfdw_get_result(conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
+	}
+	else
+	{
+		/* Fetch without cursors */
+			appendStringInfo(&buf, "%s", fsstate->query);
+
+		if (!PQsendQueryParams(conn, buf.data, numParams,
+							   	NULL, values, NULL, NULL, 0))
+			pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+		/* Call for Chunked rows mode with same size of chunk as the fetch size */
+		if (!PQsetChunkedRowsMode(conn, fsstate->fetch_size))
+			pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+	}
 
-	/*
-	 * Get the result, and check for success.
-	 *
-	 * We don't use a PG_TRY block here, so be careful not to throw error
-	 * without releasing the PGresult.
-	 */
-	res = pgfdw_get_result(conn);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
 
 	/* Mark the cursor as created, and show no tuples have been retrieved */
-	fsstate->cursor_exists = true;
-	fsstate->tuples = NULL;
+	fsstate->cursor_exists = true; // We need this even for non-cursor mode.
 	fsstate->num_tuples = 0;
 	fsstate->next_tuple = 0;
 	fsstate->fetch_ct_2 = 0;
@@ -3806,6 +3830,7 @@ fetch_more_data(ForeignScanState *node)
 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
+	bool already_done = false;
 
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
@@ -3820,7 +3845,7 @@ fetch_more_data(ForeignScanState *node)
 	{
 		PGconn	   *conn = fsstate->conn;
 		int			numrows;
-		int			i;
+		int			i = 0;
 
 		if (fsstate->async_capable)
 		{
@@ -3838,7 +3863,7 @@ fetch_more_data(ForeignScanState *node)
 			/* Reset per-connection state */
 			fsstate->conn_state->pendingAreq = NULL;
 		}
-		else
+		if (pgfdw_use_cursor)
 		{
 			char		sql[64];
 
@@ -3851,32 +3876,113 @@ fetch_more_data(ForeignScanState *node)
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 		}
+		else
+		{
+			/* Non-cursor mode uses PQSetChunkedRowsMode during create_cursor, so just get the result here. */
+			res = pgfdw_get_next_result(conn);
 
-		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
+			if (res == NULL)
+				break;
 
-		for (i = 0; i < numrows; i++)
+			else if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+				pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query);
+			else if (PQresultStatus(res) == PGRES_TUPLES_CHUNK)
+			{
+				int total = 0;
+				if (!only_query)
+				{
+					/*
+					 * When this is not the only query running, we extract all the tuples
+					 * in one go and store them in tuplestore.
+					 * Since it is using PQSetChunkedRowsMode, we get only the fsstate->fetch_size
+					 * tuples in one run, so keep on executing till we get NULL in PGresult.
+					 */
+					Tuplestorestate *tuplestore = tuplestore_begin_heap(false, true, work_mem);
+					TupleTableSlot *slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
+					HeapTuple temp_tuple =  (HeapTuple) palloc0(sizeof(HeapTuple));
+
+					i = 0;
+					for (;;)
+					{
+						CHECK_FOR_INTERRUPTS();
+						numrows = PQntuples(res);
+
+						/* Convert the data into HeapTuples */
+						Assert(IsA(node->ss.ps.plan, ForeignScan));
+						for (i = 0; i < numrows; i++)
+						{
+							temp_tuple =  make_tuple_from_result_row(res, i,
+														fsstate->rel,
+														fsstate->attinmeta,
+														fsstate->retrieved_attrs,
+														node,
+														fsstate->temp_cxt);
+							tuplestore_puttuple(tuplestore, temp_tuple);
+							total++;
+						}
+						res = pgfdw_get_next_result(conn);
+						
+						if (res == NULL)
+							break;
+		
+						else if (PQresultStatus(res) == PGRES_TUPLES_OK)
+						{
+							while (res!= NULL) 
+								res = pgfdw_get_next_result(conn);
+							break;
+						}
+						else if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+							pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query);
+					}
+					if (total > 0)
+					{
+						already_done = true;
+						numrows = total;
+						fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+						fsstate->num_tuples = numrows;
+						fsstate->next_tuple = 0;
+						for (i = 0; i < numrows; i++)
+						{
+							while (tuplestore_gettupleslot(tuplestore, true, true, slot))
+								fsstate->tuples[i++] = ExecFetchSlotHeapTuple(slot, true, NULL);
+						}
+					}
+					/* EOF is reached because when we are storing all tuples to the tuplestore. */
+					fsstate->eof_reached = true;
+					pfree(temp_tuple);
+					ExecDropSingleTupleTableSlot(slot);
+					tuplestore_end(tuplestore);
+				}
+			}
+		}
+		if (!already_done)
 		{
-			Assert(IsA(node->ss.ps.plan, ForeignScan));
-
-			fsstate->tuples[i] =
-				make_tuple_from_result_row(res, i,
-										   fsstate->rel,
-										   fsstate->attinmeta,
-										   fsstate->retrieved_attrs,
-										   node,
-										   fsstate->temp_cxt);
+			/* Convert the data into HeapTuples */
+			numrows = PQntuples(res);
+			fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+			fsstate->num_tuples = numrows;
+			fsstate->next_tuple = 0;
+
+			for (i = 0; i < numrows; i++)
+			{
+				Assert(IsA(node->ss.ps.plan, ForeignScan));
+
+				fsstate->tuples[i] =
+					make_tuple_from_result_row(res, i,
+											fsstate->rel,
+											fsstate->attinmeta,
+											fsstate->retrieved_attrs,
+											node,
+											fsstate->temp_cxt);
+			}
+
+			/* Must be EOF if we didn't get as many tuples as we asked for. */
+			fsstate->eof_reached = (numrows < fsstate->fetch_size);
 		}
 
 		/* Update fetch_ct_2 */
 		if (fsstate->fetch_ct_2 < 2)
 			fsstate->fetch_ct_2++;
-
-		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fsstate->fetch_size);
 	}
 	PG_FINALLY();
 	{
@@ -3955,16 +4061,23 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
 	char		sql[64];
 	PGresult   *res;
 
-	snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
+	if (pgfdw_use_cursor)
+	{
+		snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
 
-	/*
-	 * We don't use a PG_TRY block here, so be careful not to throw error
-	 * without releasing the PGresult.
-	 */
-	res = pgfdw_exec_query(conn, sql, conn_state);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, conn, true, sql);
-	PQclear(res);
+		/*
+		* We don't use a PG_TRY block here, so be careful not to throw error
+		* without releasing the PGresult.
+		*/
+		res = pgfdw_exec_query(conn, sql, conn_state);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(ERROR, res, conn, true, sql);
+		PQclear(res);
+	}
+	else
+	{
+		while (pgfdw_get_result(conn) != NULL) {}
+	}
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 9e501660d18..8c177ec9946 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -164,6 +164,7 @@ extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void do_sql_command(PGconn *conn, const char *sql);
 extern PGresult *pgfdw_get_result(PGconn *conn);
+extern PGresult *pgfdw_get_next_result(PGconn *conn);
 extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
 								  PgFdwConnState *state);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
@@ -177,6 +178,7 @@ extern List *ExtractExtensionList(const char *extensionsString,
 								  bool warnOnMissing);
 extern char *process_pgfdw_appname(const char *appname);
 extern char *pgfdw_application_name;
+extern bool pgfdw_use_cursor;
 
 /* in deparse.c */
 extern void classifyConditions(PlannerInfo *root,
-- 
2.39.5 (Apple Git-154)

