On Fri, Apr 8, 2016 at 6:28 PM, Robert Haas <robertmh...@gmail.com> wrote:

> On Fri, Apr 8, 2016 at 3:05 AM, Etsuro Fujita
> <fujita.ets...@lab.ntt.co.jp> wrote:
> >> What do you think?  This open item's seven-day deadline has passed.  It
> >> would
> >> help keep things moving to know whether you consider your latest patch
> >> optimal
> >> or whether you wish to change it the way Michael described.
> >
> > I wish to change it that way because it not only avoids the duplicate but
> > fixes a bug in the previous patch that I overlooked that there is a race
> > condition if a signal arrives just before entering the CheckSocket.
> >
> > Attached is an updated version of the patch.
>
> The comment just before the second hunk in the patch says:
>
>        * We don't use a PG_TRY block here, so be careful not to throw error
>        * without releasing the PGresult.
>
> But the patch adds a whole bunch of new things there that seem like
> they can error out, like CHECK_FOR_INTERRUPTS(), for example.  Isn't
> that a problem?
>

Basically we fetching the PGresult, after the newly added hunk, so there
should not be any problem.

But yes comment is definitely at wrong place.

PFA patch with correction.



>
>
--
> Robert Haas
> EnterpriseDB: http://www.enterprisedb.com
> The Enterprise PostgreSQL Company
>



-- 
Rushabh Lathia
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 189f290..8820597 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -598,6 +598,26 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 				case XACT_EVENT_ABORT:
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
+					/*
+					 * If we had submitted a command to the remote server using
+					 * an asynchronous execution function, the command might
+					 * have not yet completed.  Check to see if a command is
+					 * still being processed by the remote server, and if so,
+					 * request cancellation of the command; if not, abort
+					 * gracefully.
+					 */
+					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+					{
+						PGcancel   *cancel;
+						char		errbuf[256];
+
+						if ((cancel = PQgetCancel(entry->conn)))
+						{
+							PQcancel(cancel, errbuf, sizeof(errbuf));
+							PQfreeCancel(cancel);
+						}
+						break;
+					}
 					/* If we're aborting, abort all remote transactions too */
 					res = PQexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index ee0220a..2b6a61b 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -32,6 +32,7 @@
 #include "optimizer/var.h"
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
+#include "storage/latch.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
@@ -3131,6 +3132,7 @@ execute_dml_stmt(ForeignScanState *node)
 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
 	int			numParams = dmstate->numParams;
 	const char **values = dmstate->param_values;
+	int			wc;
 
 	/*
 	 * Construct array of query parameter values in text format.
@@ -3147,12 +3149,42 @@ execute_dml_stmt(ForeignScanState *node)
 	 * parameter (see deparse.c), the "inference" is trivial and will produce
 	 * the desired result.  This allows us to avoid assuming that the remote
 	 * server has the same OIDs we do for the parameters' types.
+	 */
+	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+						   NULL, values, NULL, NULL, 0))
+		pgfdw_report_error(ERROR, NULL, dmstate->conn, true, dmstate->query);
+
+	/*
+	 * Receive data until PQgetResult is ready to get the result without
+	 * blocking.
+	 */
+	while (PQisBusy(dmstate->conn))
+	{
+		/* Sleep until there's something to do */
+		wc = WaitLatchOrSocket(MyLatch,
+							   WL_LATCH_SET | WL_SOCKET_READABLE,
+							   PQsocket(dmstate->conn),
+							   -1L);
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Data available in socket */
+		if (wc & WL_SOCKET_READABLE)
+		{
+			if (!PQconsumeInput(dmstate->conn))
+				pgfdw_report_error(ERROR, NULL, dmstate->conn, true,
+								   dmstate->query);
+		}
+	}
+
+	/*
+	 * Get the result
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
-								   numParams, NULL, values, NULL, NULL, 0);
+	dmstate->result = PQgetResult(dmstate->conn);
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to