On 2016/04/19 14:25, Etsuro Fujita wrote:
On 2016/04/19 13:59, Michael Paquier wrote:
But huge objection to that because this fragilizes the current logic
postgres_fdw is based on: PQexec returns the last result to caller,
I'd rather not break that logic for 9.6 stability's sake.

IIUC, I think each query submitted by PQexec in postgres_fdw.c contains
just a single command.  Maybe I'm missing something, though.

A even better proof of that is the following, which just emulates what
your version of pgfdw_get_result is doing when consuming the results.
+   /* Verify that there are no more results */
+   res = pgfdw_get_result(fmstate->conn, fmstate->query);
+   if (res != NULL)
+       pgfdw_report_error(ERROR, res, fmstate->conn, true,
fmstate->query);
This could even lead to incorrect errors in the future if multiple
queries are combined with those DMLs for a reason or another.

I'd like to leave such enhancements for future work...

On reflection, I'd like to agree with Michael on that point. As mentioned by him, we should not lose the future extendability. And I don't think his version of pgfdw_get_result would damage the robustness of in-postgres_fdw.c functions using that, which was my concern. Sorry for the noise.

Attached is an updated version of the patch, which modified Michael's version of the patch, as I proposed in [1] (see "Other changes:"). I modified comments for pgfdw_get_result/pgfdw_exec_query also, mainly because words like "non-blocking mode" there seems confusing (note that we have PQsetnonbloking).

Best regards,
Etsuro Fujita

[1] http://www.postgresql.org/message-id/5715b08a.2030...@lab.ntt.co.jp
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 189f290..16ef38f 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -17,6 +17,7 @@
 #include "access/xact.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "storage/latch.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
@@ -448,6 +449,78 @@ GetPrepStmtNumber(PGconn *conn)
 }
 
 /*
+ * Submit a query and wait for the result.
+ *
+ * This function is interruptible by signals.
+ *
+ * Caller is responsible for the error handling on the result.
+ */
+PGresult *
+pgfdw_exec_query(PGconn *conn, const char *query)
+{
+	/*
+	 * Submit a query.  Since we don't use non-blocking mode, this also can
+	 * block.  But its risk is relatively small, so we ignore that for now.
+	 */
+	if (!PQsendQuery(conn, query))
+		pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+	/* Wait for the result. */
+	return pgfdw_get_result(conn, query);
+}
+
+/*
+ * Wait for the result from a prior asynchronous execution function call.
+ *
+ * This function offers quick responsiveness by checking for any interruptions.
+ *
+ * This function emulates the PQexec()'s behavior of returning the last result
+ * when there are many.
+ *
+ * Caller is responsible for the error handling on the result.
+ */
+PGresult *
+pgfdw_get_result(PGconn *conn, const char *query)
+{
+	PGresult   *last_res = NULL;
+
+	for (;;)
+	{
+		PGresult *res;
+
+		while (PQisBusy(conn))
+		{
+			int		wc;
+
+			/* Sleep until there's something to do */
+			wc = WaitLatchOrSocket(MyLatch,
+								   WL_LATCH_SET | WL_SOCKET_READABLE,
+								   PQsocket(conn),
+								   -1L);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+
+			/* Data available in socket */
+			if (wc & WL_SOCKET_READABLE)
+			{
+				if (!PQconsumeInput(conn))
+					pgfdw_report_error(ERROR, NULL, conn, false, query);
+			}
+		}
+
+		res = PQgetResult(conn);
+		if (res == NULL)
+			break;				/* query is complete */
+
+		PQclear(last_res);
+		last_res = res;
+	}
+
+	return last_res;
+}
+
+/*
  * Report an error we got from the remote server.
  *
  * elevel: error level to use (typically ERROR, but might be less)
@@ -598,6 +671,32 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 				case XACT_EVENT_ABORT:
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
+
+					/*
+					 * If a command has been submitted to the remote server by
+					 * using an asynchronous execution function, the command
+					 * might not have yet completed.  Check to see if a command
+					 * is still being processed by the remote server, and if so,
+					 * request cancellation of the command; if not, abort
+					 * gracefully.
+					 */
+					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+					{
+						PGcancel   *cancel;
+						char		errbuf[256];
+
+						if ((cancel = PQgetCancel(entry->conn)))
+						{
+							if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+								ereport(WARNING,
+										(errcode(ERRCODE_CONNECTION_FAILURE),
+										 errmsg("could not send cancel request: %s",
+												errbuf)));
+							PQfreeCancel(cancel);
+						}
+						break;
+					}
+
 					/* If we're aborting, abort all remote transactions too */
 					res = PQexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 28093e5..2f49268 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = pgfdw_exec_query(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1749,18 +1749,24 @@ postgresExecForeignInsert(EState *estate,
 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
 
 	/*
-	 * Execute the prepared statement, and check for success.
+	 * Execute the prepared statement.
+	 */
+	if (!PQsendQueryPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+	/*
+	 * Get the result, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1819,18 +1825,24 @@ postgresExecForeignUpdate(EState *estate,
 										slot);
 
 	/*
-	 * Execute the prepared statement, and check for success.
+	 * Execute the prepared statement.
+	 */
+	if (!PQsendQueryPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+	/*
+	 * Get the result, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1889,18 +1901,24 @@ postgresExecForeignDelete(EState *estate,
 										NULL);
 
 	/*
-	 * Execute the prepared statement, and check for success.
+	 * Execute the prepared statement.
+	 */
+	if (!PQsendQueryPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+	/*
+	 * Get the result, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1950,7 +1968,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = pgfdw_exec_query(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -2712,7 +2730,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = pgfdw_exec_query(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -2817,12 +2835,18 @@ create_cursor(ForeignScanState *node)
 	 * parameter (see deparse.c), the "inference" is trivial and will produce
 	 * the desired result.  This allows us to avoid assuming that the remote
 	 * server has the same OIDs we do for the parameters' types.
+	 */
+	if (!PQsendQueryParams(conn, buf.data, numParams,
+						   NULL, values, NULL, NULL, 0))
+		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+	/*
+	 * Get the result, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	res = pgfdw_get_result(conn, buf.data);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2868,7 +2892,7 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fsstate->fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		res = pgfdw_exec_query(conn, sql);
 		/* On error, report the original query, not the FETCH. */
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2978,7 +3002,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = pgfdw_exec_query(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -3006,16 +3030,21 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * with the remote server using different type OIDs than we do.  All of
 	 * the prepared statements we use in this module are simple enough that
 	 * the remote server will make the right choices.
+	 */
+	if (!PQsendPrepare(fmstate->conn,
+					   p_name,
+					   fmstate->query,
+					   0,
+					   NULL))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+	/*
+	 * Get the result, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
-					p_name,
-					fmstate->query,
-					0,
-					NULL);
-
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 	PQclear(res);
@@ -3147,12 +3176,18 @@ execute_dml_stmt(ForeignScanState *node)
 	 * parameter (see deparse.c), the "inference" is trivial and will produce
 	 * the desired result.  This allows us to avoid assuming that the remote
 	 * server has the same OIDs we do for the parameters' types.
+	 */
+	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+						   NULL, values, NULL, NULL, 0))
+		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+
+	/*
+	 * Get the result, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
-								   numParams, NULL, values, NULL, NULL, 0);
+	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
@@ -3355,7 +3390,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -3449,7 +3484,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -3500,7 +3535,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = pgfdw_exec_query(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -3675,7 +3710,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -3774,7 +3809,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 3a11d99..574b07d 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -103,6 +103,8 @@ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
+extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 				   bool clear, const char *sql);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to