On Thu, Apr 14, 2016 at 10:44 AM, Etsuro Fujita
<fujita.ets...@lab.ntt.co.jp> wrote:
> On 2016/04/13 21:50, Michael Paquier wrote:
>> On Wed, Apr 13, 2016 at 9:46 PM, Robert Haas <robertmh...@gmail.com>
>> wrote:
>>> On Tue, Apr 12, 2016 at 10:24 PM, Etsuro Fujita
>>> <fujita.ets...@lab.ntt.co.jp> wrote:
>>>>>
>>>>> How about we encapsulate the while (PQisBusy(...)) loop into a new
>>>>> function pgfdw_get_result(), which can be called after first calling
>>>>> PQsendQueryParams()?  So then this code will say dmstate->result =
>>>>> pgfdw_get_result(dmstate->conn).  And we can do something similar for
>>>>> the other call to PQexecParams() in create_cursor().  Then let's also
>>>>> add something like pgfdw_exec_query() which calls PQsendQuery() and
>>>>> then pgfdw_get_result, and use that to replace all of the existing
>>>>> calls to PQexec().
>>>>>
>>>>> Then all the SQL postgres_fdw executes would be interruptible, not
>>>>> just the new stuff.
>
> I would be happy if you work on that.

OK, so I have finished with the attached. One thing that I noticed in
the previous patch version is that it completely ignored cases where
multiple PGresult could be returned by server, say when multiple
queries are sent in the same string: PQexec gets always the last one,
so I think that we had better do the same here. I have switched all
the PQexec calls to a custom routine that combines
PQsendQuery/PQgetResult, and PQexecParams is switched to
PQsendQueryParams/PQgetResult. This structure allows all queries run
though postgres_fdw.c to be interruptible.
-- 
Michael
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 189f290..64a00b4 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -17,6 +17,7 @@
 #include "access/xact.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "storage/latch.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
@@ -448,6 +449,67 @@ GetPrepStmtNumber(PGconn *conn)
 }
 
 /*
+ * Execute query in non-blocking mode and fetch back result
+ */
+PGresult *
+pgfdw_exec_query(PGconn *conn, const char *query)
+{
+	if (!PQsendQuery(conn, query))
+		pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+	return pgfdw_get_result(conn, query);
+}
+
+/*
+ * Scan for results of query run in non-waiting mode
+ *
+ * This should be run after executing a query on the remote server and
+ * offers quick responsiveness by checking for any interruptions. The
+ * last result is returned back to caller, and previous results are
+ * consumed. Caller is responsible for the error handling on the fetched
+ * result.
+ */
+PGresult *
+pgfdw_get_result(PGconn *conn, const char *query)
+{
+	PGresult   *last_res;
+
+	for (;;)
+	{
+		PGresult *res;
+
+		while (PQisBusy(conn))
+		{
+			int		wc;
+
+			/* Sleep until there's something to do */
+			wc = WaitLatchOrSocket(MyLatch,
+								   WL_LATCH_SET | WL_SOCKET_READABLE,
+								   PQsocket(conn),
+								   -1L);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+
+			/* Data available in socket */
+			if (wc & WL_SOCKET_READABLE)
+			{
+				if (!PQconsumeInput(conn))
+					pgfdw_report_error(ERROR, NULL, conn, false, query);
+			}
+		}
+
+		res = PQgetResult(conn);
+		if (res == NULL)
+			break;
+
+		last_res = res;
+	}
+
+	return last_res;
+}
+
+/*
  * Report an error we got from the remote server.
  *
  * elevel: error level to use (typically ERROR, but might be less)
@@ -598,6 +660,32 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 				case XACT_EVENT_ABORT:
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
+
+					/*
+					 * If a command has been submitted to the remote server
+					 * using an asynchronous execution function, the command
+					 * might not have yet completed.  Check to see if a command
+					 * is still being processed by the remote server, and if so,
+					 * request cancellation of the command; if not, abort
+					 * gracefully.
+					 */
+					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+					{
+						PGcancel   *cancel;
+						char		errbuf[256];
+
+						if ((cancel = PQgetCancel(entry->conn)))
+						{
+							if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+								ereport(WARNING,
+										(errcode(ERRCODE_CONNECTION_FAILURE),
+										 errmsg("could not send cancel request: %s",
+												errbuf)));
+							PQfreeCancel(cancel);
+						}
+						break;
+					}
+
 					/* If we're aborting, abort all remote transactions too */
 					res = PQexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index ee0220a..31f5bb2 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = pgfdw_exec_query(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1950,7 +1950,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = pgfdw_exec_query(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -2712,7 +2712,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = pgfdw_exec_query(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -2821,8 +2821,11 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	if (!PQsendQueryParams(conn, buf.data, numParams,
+						   NULL, values, NULL, NULL, 0))
+		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+	res = pgfdw_get_result(conn, buf.data);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2868,7 +2871,7 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fsstate->fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		res = pgfdw_exec_query(conn, sql);
 		/* On error, report the original query, not the FETCH. */
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2978,7 +2981,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = pgfdw_exec_query(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -3151,8 +3154,12 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
-								   numParams, NULL, values, NULL, NULL, 0);
+	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+						   NULL, values, NULL, NULL, 0))
+		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+
+	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
@@ -3355,7 +3362,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -3449,7 +3456,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -3500,7 +3507,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = pgfdw_exec_query(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -3675,7 +3682,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -3774,7 +3781,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 3a11d99..574b07d 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -103,6 +103,8 @@ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
+extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 				   bool clear, const char *sql);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to