From 096a5e03f4f8ebbe5e10ef9373f83dcd5edc1b78 Mon Sep 17 00:00:00 2001
From: Rafia Sabih <rafia.sabih@cybertec.at>
Date: Fri, 17 Jan 2025 12:50:19 +0100
Subject: [PATCH] Add a fetch mechanism without cursors

This adds a GUC to enable/ disable cursor mode in postgres_fdw.
The GUC is called postgres_fdw.use_cursor. When it is set, everything
works as it is now. However, there is a limitation to the current
mechanism, it is unable to use parallel plans at local side because
of the use of cursors. Now, if a user wants to overcome this, then
one can unset the abovementioned GUC. In non-cursor mode cursors are
not used and hence the parallel plans can be used at the local side.
In non-cursor mode fetch_size is used to as is.

A caveat with the non-cursor mode is that when simultaneous queries are
fired at the local side, i.e. more than one cursor is opened at a time,
then we use Tuplestore, so there might be some memory related performance
degradation only in those cases.

Original idea: Bernd Helmle
Key suggestions: Robert Haas
---
 contrib/postgres_fdw/connection.c   |   7 +
 contrib/postgres_fdw/option.c       |  17 +++
 contrib/postgres_fdw/postgres_fdw.c | 227 +++++++++++++++++++++-------
 contrib/postgres_fdw/postgres_fdw.h |   2 +
 4 files changed, 197 insertions(+), 56 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 202e7e583b3..acd6bfe9b4b 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -877,6 +877,13 @@ pgfdw_get_result(PGconn *conn)
 	return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
 }
 
+PGresult *
+pgfdw_get_next_result(PGconn *conn)
+{
+	return libpqsrv_get_result(conn, pgfdw_we_get_result);
+}
+
+
 /*
  * Report an error we got from the remote server.
  *
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 12aed4054fa..bc3d5c46286 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -49,6 +49,7 @@ static PQconninfoOption *libpq_options;
  * GUC parameters
  */
 char	   *pgfdw_application_name = NULL;
+bool pgfdw_use_cursor = true;
 
 /*
  * Helper functions
@@ -585,5 +586,21 @@ _PG_init(void)
 							   NULL,
 							   NULL);
 
+	/*
+	 * If use_cursor is set to false, then the new way of fetching is used, which allows for the
+	 * use of parallel plans at the local side. In the cursor mode, parallel plans could not be
+	 * used.
+	 */
+	DefineCustomBoolVariable("postgres_fdw.use_cursor",
+							"If set uses the cursor, otherwise fetches without cursor",
+							NULL,
+							&pgfdw_use_cursor,
+							true,
+							PGC_USERSET,
+							0,
+							NULL,
+							NULL,
+							NULL);
+
 	MarkGUCPrefixReserved("postgres_fdw");
 }
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index b92e2a0fc9f..11097958625 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "executor/execAsync.h"
+#include "executor/executor.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -542,6 +543,7 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int	get_batch_size_option(Relation rel);
 
+static int  num_queries = 0;      /* Only to be used in the non cursor mode */
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -1544,6 +1546,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	/* Get private info created by planner functions. */
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
+
+	/* We need to know if there are simultaneous queries running. */
+	num_queries++;
+
 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
 												 FdwScanPrivateRetrievedAttrs);
 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
@@ -1673,7 +1679,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * case.  If we've only fetched zero or one batch, we needn't even rewind
 	 * the cursor, just rescan what we have.
 	 */
-	if (node->ss.ps.chgParam != NULL)
+	if (node->ss.ps.chgParam != NULL && pgfdw_use_cursor)
 	{
 		fsstate->cursor_exists = false;
 		snprintf(sql, sizeof(sql), "CLOSE c%u",
@@ -1684,7 +1690,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 		if (PQserverVersion(fsstate->conn) < 150000)
 			snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
 					 fsstate->cursor_number);
-		else
+		else if (pgfdw_use_cursor)
 		{
 			fsstate->cursor_exists = false;
 			snprintf(sql, sizeof(sql), "CLOSE c%u",
@@ -1737,6 +1743,9 @@ postgresEndForeignScan(ForeignScanState *node)
 	ReleaseConnection(fsstate->conn);
 	fsstate->conn = NULL;
 
+	/* To know if there are simulataneous queries running. */
+	num_queries--;
+
 	/* MemoryContexts will be deleted automatically. */
 }
 
@@ -3733,7 +3742,7 @@ create_cursor(ForeignScanState *node)
 	const char **values = fsstate->param_values;
 	PGconn	   *conn = fsstate->conn;
 	StringInfoData buf;
-	PGresult   *res;
+	PGresult   *res = NULL;
 
 	/* First, process a pending asynchronous request, if any. */
 	if (fsstate->conn_state->pendingAreq)
@@ -3758,36 +3767,53 @@ create_cursor(ForeignScanState *node)
 		MemoryContextSwitchTo(oldcontext);
 	}
 
-	/* Construct the DECLARE CURSOR command */
 	initStringInfo(&buf);
-	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
-					 fsstate->cursor_number, fsstate->query);
 
-	/*
-	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
-	 * to infer types for all parameters.  Since we explicitly cast every
-	 * parameter (see deparse.c), the "inference" is trivial and will produce
-	 * the desired result.  This allows us to avoid assuming that the remote
-	 * server has the same OIDs we do for the parameters' types.
-	 */
-	if (!PQsendQueryParams(conn, buf.data, numParams,
-						   NULL, values, NULL, NULL, 0))
-		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+	if (pgfdw_use_cursor)
+	{
+		/* Construct the DECLARE CURSOR command */
+		appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
+						fsstate->cursor_number, fsstate->query);
+
+		/*
+		* Notice that we pass NULL for paramTypes, thus forcing the remote server
+		* to infer types for all parameters.  Since we explicitly cast every
+		* parameter (see deparse.c), the "inference" is trivial and will produce
+		* the desired result.  This allows us to avoid assuming that the remote
+		* server has the same OIDs we do for the parameters' types.
+		*/
+		if (!PQsendQueryParams(conn, buf.data, numParams,
+							NULL, values, NULL, NULL, 0))
+			pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+		/*
+		* Get the result, and check for success.
+		*
+		* We don't use a PG_TRY block here, so be careful not to throw error
+		* without releasing the PGresult.
+		*/
+		res = pgfdw_get_result(conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
+	}
+	else
+	{
+		/* Fetch without cursors */
+			appendStringInfo(&buf, "%s", fsstate->query);
+
+		if (!PQsendQueryParams(conn, buf.data, numParams,
+							NULL, values, NULL, NULL, 0))
+			pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+		/* Call for Chunked rows mode with same size of chunk as the fetch size */
+		if (!PQsetChunkedRowsMode(conn, fsstate->fetch_size))
+			pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+	}
 
-	/*
-	 * Get the result, and check for success.
-	 *
-	 * We don't use a PG_TRY block here, so be careful not to throw error
-	 * without releasing the PGresult.
-	 */
-	res = pgfdw_get_result(conn);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
 
 	/* Mark the cursor as created, and show no tuples have been retrieved */
-	fsstate->cursor_exists = true;
-	fsstate->tuples = NULL;
+	fsstate->cursor_exists = true; // We need this even for non-cursor mode.
 	fsstate->num_tuples = 0;
 	fsstate->next_tuple = 0;
 	fsstate->fetch_ct_2 = 0;
@@ -3806,6 +3832,7 @@ fetch_more_data(ForeignScanState *node)
 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
+	bool already_done = false;
 
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
@@ -3820,7 +3847,7 @@ fetch_more_data(ForeignScanState *node)
 	{
 		PGconn	   *conn = fsstate->conn;
 		int			numrows;
-		int			i;
+		int			i = 0;
 
 		if (fsstate->async_capable)
 		{
@@ -3838,7 +3865,7 @@ fetch_more_data(ForeignScanState *node)
 			/* Reset per-connection state */
 			fsstate->conn_state->pendingAreq = NULL;
 		}
-		else
+		else if (pgfdw_use_cursor)
 		{
 			char		sql[64];
 
@@ -3851,32 +3878,113 @@ fetch_more_data(ForeignScanState *node)
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 		}
+		else
+		{
+			/* Non-cursor mode uses PQSetChunkedRowsMode during create_cursor, so just get the result here. */
+			res = pgfdw_get_next_result(conn);
 
-		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
+			if (res == NULL)
+				break;
 
-		for (i = 0; i < numrows; i++)
+			else if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+			else if (PQresultStatus(res) == PGRES_TUPLES_CHUNK)
+			{
+				int total = 0;
+				if (num_queries > 1)
+				{
+					/*
+					 * When this is not the only query running, we extract all the tuples
+					 * in one go and store them in tuplestore.
+					 * Since it is using PQSetChunkedRowsMode, we get only the fsstate->fetch_size
+					 * tuples in one run, so keep on executing till we get NULL in PGresult.
+					 */
+					Tuplestorestate *tuplestore = tuplestore_begin_heap(false, true, work_mem);
+					TupleTableSlot *slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
+					HeapTuple temp_tuple =  (HeapTuple) palloc0(sizeof(HeapTuple));
+
+					i = 0;
+					for (;;)
+					{
+						CHECK_FOR_INTERRUPTS();
+						numrows = PQntuples(res);
+
+						/* Convert the data into HeapTuples */
+						Assert(IsA(node->ss.ps.plan, ForeignScan));
+						for (i = 0; i < numrows; i++)
+						{
+							temp_tuple =  make_tuple_from_result_row(res, i,
+														fsstate->rel,
+														fsstate->attinmeta,
+														fsstate->retrieved_attrs,
+														node,
+														fsstate->temp_cxt);
+							tuplestore_puttuple(tuplestore, temp_tuple);
+							total++;
+						}
+						res = pgfdw_get_next_result(conn);
+
+						if (res == NULL)
+							break;
+
+						else if (PQresultStatus(res) == PGRES_TUPLES_OK)
+						{
+							while (res!= NULL)
+								res = pgfdw_get_next_result(conn);
+							break;
+						}
+						else if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+							pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+					}
+					if (total > 0)
+					{
+						already_done = true;
+						numrows = total;
+						fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+						fsstate->num_tuples = numrows;
+						fsstate->next_tuple = 0;
+						for (i = 0; i < numrows; i++)
+						{
+							while (tuplestore_gettupleslot(tuplestore, true, true, slot))
+								fsstate->tuples[i++] = ExecFetchSlotHeapTuple(slot, true, NULL);
+						}
+					}
+					/* EOF is reached because when we are storing all tuples to the tuplestore. */
+					fsstate->eof_reached = true;
+					pfree(temp_tuple);
+					ExecDropSingleTupleTableSlot(slot);
+					tuplestore_end(tuplestore);
+				}
+			}
+		}
+		if (!already_done)
 		{
-			Assert(IsA(node->ss.ps.plan, ForeignScan));
-
-			fsstate->tuples[i] =
-				make_tuple_from_result_row(res, i,
-										   fsstate->rel,
-										   fsstate->attinmeta,
-										   fsstate->retrieved_attrs,
-										   node,
-										   fsstate->temp_cxt);
+			/* Convert the data into HeapTuples */
+			numrows = PQntuples(res);
+			fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+			fsstate->num_tuples = numrows;
+			fsstate->next_tuple = 0;
+
+			for (i = 0; i < numrows; i++)
+			{
+				Assert(IsA(node->ss.ps.plan, ForeignScan));
+
+				fsstate->tuples[i] =
+					make_tuple_from_result_row(res, i,
+											fsstate->rel,
+											fsstate->attinmeta,
+											fsstate->retrieved_attrs,
+											node,
+											fsstate->temp_cxt);
+			}
+
+			/* Must be EOF if we didn't get as many tuples as we asked for. */
+			fsstate->eof_reached = (numrows < fsstate->fetch_size);
 		}
 
 		/* Update fetch_ct_2 */
 		if (fsstate->fetch_ct_2 < 2)
 			fsstate->fetch_ct_2++;
-
-		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fsstate->fetch_size);
 	}
 	PG_FINALLY();
 	{
@@ -3955,16 +4063,23 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
 	char		sql[64];
 	PGresult   *res;
 
-	snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
+	if (pgfdw_use_cursor)
+	{
+		snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
 
-	/*
-	 * We don't use a PG_TRY block here, so be careful not to throw error
-	 * without releasing the PGresult.
-	 */
-	res = pgfdw_exec_query(conn, sql, conn_state);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, conn, true, sql);
-	PQclear(res);
+		/*
+		* We don't use a PG_TRY block here, so be careful not to throw error
+		* without releasing the PGresult.
+		*/
+		res = pgfdw_exec_query(conn, sql, conn_state);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(ERROR, res, conn, true, sql);
+		PQclear(res);
+	}
+	else
+	{
+		while (pgfdw_get_result(conn) != NULL) {}
+	}
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 81358f3bde7..f52900b30cd 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -164,6 +164,7 @@ extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void do_sql_command(PGconn *conn, const char *sql);
 extern PGresult *pgfdw_get_result(PGconn *conn);
+extern PGresult *pgfdw_get_next_result(PGconn *conn);
 extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
 								  PgFdwConnState *state);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
@@ -177,6 +178,7 @@ extern List *ExtractExtensionList(const char *extensionsString,
 								  bool warnOnMissing);
 extern char *process_pgfdw_appname(const char *appname);
 extern char *pgfdw_application_name;
+extern bool pgfdw_use_cursor;
 
 /* in deparse.c */
 extern void classifyConditions(PlannerInfo *root,
-- 
2.39.5 (Apple Git-154)

