On 2016/04/19 12:45, Etsuro Fujita wrote:
On 2016/04/19 12:26, Michael Paquier wrote:
Note for Robert: pgfdw_get_result copycats PQexec by discarding all
PQresult received except the last one. I think that's fine for the
purposes of postgres_fdw, but perhaps you have a different opinion on
the matter.

That seemed reasonable to me, but sorry, on second thought, I'm not sure
that's still a good idea.  One reason is (1) I think it's better for the
in-postgres_fdw.c functions using pgfdw_get_result to verify that there
are no more results, in itself.  I think that would improve the
robustness of those functions.  Another reason is I don't think
pgfdw_report_error, which is used in pgfdw_get_result, works well for
cases where the query contains multiple SQL commands.  So, +1 for the
idea of simply encapsulating the while (PQisBusy(...)) loop into a new
function pgfdw_get_result().

Here is a proposed patch for that.

Other changes:

         * We don't use a PG_TRY block here, so be careful not to throw error
         * without releasing the PGresult.
         */
-       res = PQexecPrepared(fmstate->conn,
-                                                fmstate->p_name,
-                                                fmstate->p_nums,
-                                                p_values,
-                                                NULL,
-                                                NULL,
-                                                0);
+       if (!PQsendQueryPrepared(fmstate->conn,
+                                                        fmstate->p_name,
+                                                        fmstate->p_nums,
+                                                        p_values,
+                                                        NULL,
+                                                        NULL,
+                                                        0))
+               pgfdw_report_error(ERROR, NULL, fmstate->conn, true, 
fmstate->query);

The comment "We don't use a PG_TRY block here ..." seems to be wrongly placed, so I moved that comment. Also, I think it'd be better to call pgfdw_report_error() with the clear argument set to false, not true, since we don't need to clear the PGresult. Same for postgresExecForeignUpdate, postgresExecForeignUpdate, and prepare_foreign_modify.

What do you think about that?

Best regards,
Etsuro Fujita
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 17,22 ****
--- 17,23 ----
  #include "access/xact.h"
  #include "mb/pg_wchar.h"
  #include "miscadmin.h"
+ #include "storage/latch.h"
  #include "utils/hsearch.h"
  #include "utils/memutils.h"
  
***************
*** 448,453 **** GetPrepStmtNumber(PGconn *conn)
--- 449,537 ----
  }
  
  /*
+  * Send a query using PQsendQuery, and wait for the results.
+  *
+  * This function is interruptible by signals.
+  *
+  * Note: we assume that the query string contains a single SQL command.
+  */
+ PGresult *
+ pgfdw_exec_query(PGconn *conn, const char *query)
+ {
+ 	PGresult   *res;
+ 	PGresult   *last_res = NULL;
+ 
+ 	/*
+ 	 * Submit a query.  Since we don't use non-blocking mode, this also can
+ 	 * block.  But its risk is relatively small, so we ignore that for now.
+ 	 */
+ 	if (!PQsendQuery(conn, query))
+ 		pgfdw_report_error(ERROR, NULL, conn, false, query);
+ 
+ 	/* Wait for the result */
+ 	res = pgfdw_get_result(conn, query);
+ 	if (res == NULL)
+ 		pgfdw_report_error(ERROR, NULL, conn, false, query);
+ 	last_res = res;
+ 
+ 	/*
+ 	 * Verify that there are no more results
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = pgfdw_get_result(conn, query);
+ 	if (res != NULL)
+ 	{
+ 		PQclear(last_res);
+ 		pgfdw_report_error(ERROR, res, conn, true, query);
+ 	}
+ 
+ 	return last_res;
+ }
+ 
+ /*
+  * Wait for the next result from a prior asynchronous execution function call,
+  * and return it.
+  *
+  * This function offers quick responsiveness by checking for any interruptions.
+  *
+  * Caller is responsible for the error handling on the fetched result.
+  *
+  * Note: we assume that the query string contains a single SQL command.
+  */
+ PGresult *
+ pgfdw_get_result(PGconn *conn, const char *query)
+ {
+ 	/*
+ 	 * Receive data until PQgetResult is ready to get the result without
+ 	 * blocking.
+ 	 */
+ 	while (PQisBusy(conn))
+ 	{
+ 		int		wc;
+ 
+ 		/* Sleep until there's something to do */
+ 		wc = WaitLatchOrSocket(MyLatch,
+ 							   WL_LATCH_SET | WL_SOCKET_READABLE,
+ 							   PQsocket(conn),
+ 							   -1L);
+ 		ResetLatch(MyLatch);
+ 
+ 		CHECK_FOR_INTERRUPTS();
+ 
+ 		/* Data available in socket */
+ 		if (wc & WL_SOCKET_READABLE)
+ 		{
+ 			if (!PQconsumeInput(conn))
+ 				pgfdw_report_error(ERROR, NULL, conn, false, query);
+ 		}
+ 	}
+ 
+ 	return PQgetResult(conn);
+ }
+ 
+ /*
   * Report an error we got from the remote server.
   *
   * elevel: error level to use (typically ERROR, but might be less)
***************
*** 598,603 **** pgfdw_xact_callback(XactEvent event, void *arg)
--- 682,713 ----
  				case XACT_EVENT_ABORT:
  					/* Assume we might have lost track of prepared statements */
  					entry->have_error = true;
+ 
+ 					/*
+ 					 * If a command has been submitted to the remote server
+ 					 * using an asynchronous execution function, the command
+ 					 * might not have yet completed.  Check to see if a command
+ 					 * is still being processed by the remote server, and if so,
+ 					 * request cancellation of the command; if not, abort
+ 					 * gracefully.
+ 					 */
+ 					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ 					{
+ 						PGcancel   *cancel;
+ 						char		errbuf[256];
+ 
+ 						if ((cancel = PQgetCancel(entry->conn)))
+ 						{
+ 							if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+ 								ereport(WARNING,
+ 										(errcode(ERRCODE_CONNECTION_FAILURE),
+ 										 errmsg("could not send cancel request: %s",
+ 												errbuf)));
+ 							PQfreeCancel(cancel);
+ 						}
+ 						break;
+ 					}
+ 
  					/* If we're aborting, abort all remote transactions too */
  					res = PQexec(entry->conn, "ABORT TRANSACTION");
  					/* Note: can't throw ERROR, it would be infinite loop */
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 1421,1427 **** postgresReScanForeignScan(ForeignScanState *node)
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQexec(fsstate->conn, sql);
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
  		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
  	PQclear(res);
--- 1421,1427 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = pgfdw_exec_query(fsstate->conn, sql);
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
  		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
  	PQclear(res);
***************
*** 1749,1766 **** postgresExecForeignInsert(EState *estate,
  	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
  	/*
! 	 * Execute the prepared statement, and check for success.
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQexecPrepared(fmstate->conn,
! 						 fmstate->p_name,
! 						 fmstate->p_nums,
! 						 p_values,
! 						 NULL,
! 						 NULL,
! 						 0);
  	if (PQresultStatus(res) !=
  		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
--- 1749,1772 ----
  	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
  	/*
! 	 * Execute the prepared statement
! 	 */
! 	if (!PQsendQueryPrepared(fmstate->conn,
! 							 fmstate->p_name,
! 							 fmstate->p_nums,
! 							 p_values,
! 							 NULL,
! 							 NULL,
! 							 0))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
! 
! 	/*
! 	 * Get the result, and check for success
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
  	if (PQresultStatus(res) !=
  		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
***************
*** 1778,1783 **** postgresExecForeignInsert(EState *estate,
--- 1784,1794 ----
  	/* And clean up */
  	PQclear(res);
  
+ 	/* Verify that there are no more results */
+ 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ 	if (res != NULL)
+ 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ 
  	MemoryContextReset(fmstate->temp_cxt);
  
  	/* Return NULL if nothing was inserted on the remote end */
***************
*** 1819,1836 **** postgresExecForeignUpdate(EState *estate,
  										slot);
  
  	/*
! 	 * Execute the prepared statement, and check for success.
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQexecPrepared(fmstate->conn,
! 						 fmstate->p_name,
! 						 fmstate->p_nums,
! 						 p_values,
! 						 NULL,
! 						 NULL,
! 						 0);
  	if (PQresultStatus(res) !=
  		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
--- 1830,1853 ----
  										slot);
  
  	/*
! 	 * Execute the prepared statement
! 	 */
! 	if (!PQsendQueryPrepared(fmstate->conn,
! 							 fmstate->p_name,
! 							 fmstate->p_nums,
! 							 p_values,
! 							 NULL,
! 							 NULL,
! 							 0))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
! 
! 	/*
! 	 * Get the result, and check for success
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
  	if (PQresultStatus(res) !=
  		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
***************
*** 1848,1853 **** postgresExecForeignUpdate(EState *estate,
--- 1865,1875 ----
  	/* And clean up */
  	PQclear(res);
  
+ 	/* Verify that there are no more results */
+ 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ 	if (res != NULL)
+ 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ 
  	MemoryContextReset(fmstate->temp_cxt);
  
  	/* Return NULL if nothing was updated on the remote end */
***************
*** 1889,1906 **** postgresExecForeignDelete(EState *estate,
  										NULL);
  
  	/*
! 	 * Execute the prepared statement, and check for success.
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQexecPrepared(fmstate->conn,
! 						 fmstate->p_name,
! 						 fmstate->p_nums,
! 						 p_values,
! 						 NULL,
! 						 NULL,
! 						 0);
  	if (PQresultStatus(res) !=
  		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
--- 1911,1934 ----
  										NULL);
  
  	/*
! 	 * Execute the prepared statement
! 	 */
! 	if (!PQsendQueryPrepared(fmstate->conn,
! 							 fmstate->p_name,
! 							 fmstate->p_nums,
! 							 p_values,
! 							 NULL,
! 							 NULL,
! 							 0))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
! 
! 	/*
! 	 * Get the result, and check for success
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
  	if (PQresultStatus(res) !=
  		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
***************
*** 1918,1923 **** postgresExecForeignDelete(EState *estate,
--- 1946,1956 ----
  	/* And clean up */
  	PQclear(res);
  
+ 	/* Verify that there are no more results */
+ 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ 	if (res != NULL)
+ 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ 
  	MemoryContextReset(fmstate->temp_cxt);
  
  	/* Return NULL if nothing was deleted on the remote end */
***************
*** 1950,1956 **** postgresEndForeignModify(EState *estate,
  		 * We don't use a PG_TRY block here, so be careful not to throw error
  		 * without releasing the PGresult.
  		 */
! 		res = PQexec(fmstate->conn, sql);
  		if (PQresultStatus(res) != PGRES_COMMAND_OK)
  			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
  		PQclear(res);
--- 1983,1989 ----
  		 * We don't use a PG_TRY block here, so be careful not to throw error
  		 * without releasing the PGresult.
  		 */
! 		res = pgfdw_exec_query(fmstate->conn, sql);
  		if (PQresultStatus(res) != PGRES_COMMAND_OK)
  			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
  		PQclear(res);
***************
*** 2336,2344 **** postgresEndDirectModify(ForeignScanState *node)
  	if (dmstate == NULL)
  		return;
  
! 	/* Release PGresult */
  	if (dmstate->result)
  		PQclear(dmstate->result);
  
  	/* Release remote connection */
  	ReleaseConnection(dmstate->conn);
--- 2369,2388 ----
  	if (dmstate == NULL)
  		return;
  
! 	/*
! 	 * Verify that there are no more results if necessary
! 	 *
! 	 * We don't use a PG_TRY block here, so be careful not to throw error
! 	 * without releasing the PGresult.
! 	 */
  	if (dmstate->result)
+ 	{
  		PQclear(dmstate->result);
+ 		dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+ 		if (dmstate->result != NULL)
+ 			pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+ 							   dmstate->query);
+ 	}
  
  	/* Release remote connection */
  	ReleaseConnection(dmstate->conn);
***************
*** 2712,2718 **** get_remote_estimate(const char *sql, PGconn *conn,
  		/*
  		 * Execute EXPLAIN remotely.
  		 */
! 		res = PQexec(conn, sql);
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, sql);
  
--- 2756,2762 ----
  		/*
  		 * Execute EXPLAIN remotely.
  		 */
! 		res = pgfdw_exec_query(conn, sql);
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, sql);
  
***************
*** 2817,2832 **** create_cursor(ForeignScanState *node)
  	 * parameter (see deparse.c), the "inference" is trivial and will produce
  	 * the desired result.  This allows us to avoid assuming that the remote
  	 * server has the same OIDs we do for the parameters' types.
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQexecParams(conn, buf.data, numParams, NULL, values,
! 					   NULL, NULL, 0);
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
  		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
  	PQclear(res);
  
  	/* Mark the cursor as created, and show no tuples have been retrieved */
  	fsstate->cursor_exists = true;
  	fsstate->tuples = NULL;
--- 2861,2887 ----
  	 * parameter (see deparse.c), the "inference" is trivial and will produce
  	 * the desired result.  This allows us to avoid assuming that the remote
  	 * server has the same OIDs we do for the parameters' types.
+ 	 */
+ 	if (!PQsendQueryParams(conn, buf.data, numParams,
+ 						   NULL, values, NULL, NULL, 0))
+ 		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+ 
+ 	/*
+ 	 * Get the result, and check for success
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = pgfdw_get_result(conn, buf.data);
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
  		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
  	PQclear(res);
  
+ 	/* Verify that there are no more results */
+ 	res = pgfdw_get_result(conn, buf.data);
+ 	if (res != NULL)
+ 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
+ 
  	/* Mark the cursor as created, and show no tuples have been retrieved */
  	fsstate->cursor_exists = true;
  	fsstate->tuples = NULL;
***************
*** 2868,2874 **** fetch_more_data(ForeignScanState *node)
  		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
  				 fsstate->fetch_size, fsstate->cursor_number);
  
! 		res = PQexec(conn, sql);
  		/* On error, report the original query, not the FETCH. */
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
--- 2923,2929 ----
  		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
  				 fsstate->fetch_size, fsstate->cursor_number);
  
! 		res = pgfdw_exec_query(conn, sql);
  		/* On error, report the original query, not the FETCH. */
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
***************
*** 2978,2984 **** close_cursor(PGconn *conn, unsigned int cursor_number)
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQexec(conn, sql);
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
  		pgfdw_report_error(ERROR, res, conn, true, sql);
  	PQclear(res);
--- 3033,3039 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = pgfdw_exec_query(conn, sql);
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
  		pgfdw_report_error(ERROR, res, conn, true, sql);
  	PQclear(res);
***************
*** 3010,3025 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQprepare(fmstate->conn,
! 					p_name,
! 					fmstate->query,
! 					0,
! 					NULL);
  
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
  		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
  	fmstate->p_name = p_name;
  }
--- 3065,3093 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	if (!PQsendPrepare(fmstate->conn,
! 					   p_name,
! 					   fmstate->query,
! 					   0,
! 					   NULL))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
  
+ 	/*
+ 	 * Get the result, and check for success
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
  		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
  	PQclear(res);
  
+ 	/* Verify that there are no more results */
+ 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ 	if (res != NULL)
+ 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ 
  	/* This action shows that the prepare has been done. */
  	fmstate->p_name = p_name;
  }
***************
*** 3147,3158 **** execute_dml_stmt(ForeignScanState *node)
  	 * parameter (see deparse.c), the "inference" is trivial and will produce
  	 * the desired result.  This allows us to avoid assuming that the remote
  	 * server has the same OIDs we do for the parameters' types.
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
! 								   numParams, NULL, values, NULL, NULL, 0);
  	if (PQresultStatus(dmstate->result) !=
  		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
--- 3215,3232 ----
  	 * parameter (see deparse.c), the "inference" is trivial and will produce
  	 * the desired result.  This allows us to avoid assuming that the remote
  	 * server has the same OIDs we do for the parameters' types.
+ 	 */
+ 	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+ 						   NULL, values, NULL, NULL, 0))
+ 		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+ 
+ 	/*
+ 	 * Get the result, and check for success.
  	 *
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
  	if (PQresultStatus(dmstate->result) !=
  		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
***************
*** 3355,3361 **** postgresAnalyzeForeignTable(Relation relation,
  	/* In what follows, do not risk leaking any PGresults. */
  	PG_TRY();
  	{
! 		res = PQexec(conn, sql.data);
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, sql.data);
  
--- 3429,3435 ----
  	/* In what follows, do not risk leaking any PGresults. */
  	PG_TRY();
  	{
! 		res = pgfdw_exec_query(conn, sql.data);
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, sql.data);
  
***************
*** 3449,3455 **** postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  	/* In what follows, do not risk leaking any PGresults. */
  	PG_TRY();
  	{
! 		res = PQexec(conn, sql.data);
  		if (PQresultStatus(res) != PGRES_COMMAND_OK)
  			pgfdw_report_error(ERROR, res, conn, false, sql.data);
  		PQclear(res);
--- 3523,3529 ----
  	/* In what follows, do not risk leaking any PGresults. */
  	PG_TRY();
  	{
! 		res = pgfdw_exec_query(conn, sql.data);
  		if (PQresultStatus(res) != PGRES_COMMAND_OK)
  			pgfdw_report_error(ERROR, res, conn, false, sql.data);
  		PQclear(res);
***************
*** 3500,3506 **** postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
  					 fetch_size, cursor_number);
  
! 			res = PQexec(conn, fetch_sql);
  			/* On error, report the original query, not the FETCH. */
  			if (PQresultStatus(res) != PGRES_TUPLES_OK)
  				pgfdw_report_error(ERROR, res, conn, false, sql.data);
--- 3574,3580 ----
  			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
  					 fetch_size, cursor_number);
  
! 			res = pgfdw_exec_query(conn, fetch_sql);
  			/* On error, report the original query, not the FETCH. */
  			if (PQresultStatus(res) != PGRES_TUPLES_OK)
  				pgfdw_report_error(ERROR, res, conn, false, sql.data);
***************
*** 3675,3681 **** postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
  		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
  		deparseStringLiteral(&buf, stmt->remote_schema);
  
! 		res = PQexec(conn, buf.data);
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, buf.data);
  
--- 3749,3755 ----
  		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
  		deparseStringLiteral(&buf, stmt->remote_schema);
  
! 		res = pgfdw_exec_query(conn, buf.data);
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, buf.data);
  
***************
*** 3774,3780 **** postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
  		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
  
  		/* Fetch the data */
! 		res = PQexec(conn, buf.data);
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, buf.data);
  
--- 3848,3854 ----
  		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
  
  		/* Fetch the data */
! 		res = pgfdw_exec_query(conn, buf.data);
  		if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			pgfdw_report_error(ERROR, res, conn, false, buf.data);
  
*** a/contrib/postgres_fdw/postgres_fdw.h
--- b/contrib/postgres_fdw/postgres_fdw.h
***************
*** 103,108 **** extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
--- 103,110 ----
  extern void ReleaseConnection(PGconn *conn);
  extern unsigned int GetCursorNumber(PGconn *conn);
  extern unsigned int GetPrepStmtNumber(PGconn *conn);
+ extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+ extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
  extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
  				   bool clear, const char *sql);
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to