On 2016/04/08 13:42, Noah Misch wrote:
On Tue, Apr 05, 2016 at 03:22:03PM +0900, Etsuro Fujita wrote:
On 2016/04/04 20:35, Michael Paquier wrote:
On Mon, Apr 4, 2016 at 7:49 PM, Etsuro Fujita
<fujita.ets...@lab.ntt.co.jp> wrote:
Here is a patch to fix this issue.  As proposed by Michael, I modified
execute_dml_stmt so that it uses PQsendQueryParams, not PQexecParams. Any
comments are welcome.

+  * This is based on pqSocketCheck.
+  */
+ bool
+ CheckSocket(PGconn *conn)
+ {
+     int            ret;
+
+     Assert(conn != NULL);
Instead of copying again pqSocketQuery, which is as well copied in
libpqwalreceiver.c, wouldn't it be better to use WaitLatchOrSocket
with the socket returned by PQsocket?

Will check.  Thanks for the comment!

What do you think?  This open item's seven-day deadline has passed.  It would
help keep things moving to know whether you consider your latest patch optimal
or whether you wish to change it the way Michael described.

I wish to change it that way because it not only avoids the duplicate but fixes a bug in the previous patch that I overlooked that there is a race condition if a signal arrives just before entering the CheckSocket.

Attached is an updated version of the patch.

Sorry for the delay.

Best regards,
Etsuro Fujita
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 598,603 **** pgfdw_xact_callback(XactEvent event, void *arg)
--- 598,623 ----
  				case XACT_EVENT_ABORT:
  					/* Assume we might have lost track of prepared statements */
  					entry->have_error = true;
+ 					/*
+ 					 * If we had submitted a command to the remote server using
+ 					 * an asynchronous execution function, the command might
+ 					 * have not yet completed.  Check to see if a command is
+ 					 * still being processed by the remote server, and if so,
+ 					 * request cancellation of the command; if not, abort
+ 					 * gracefully.
+ 					 */
+ 					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ 					{
+ 						PGcancel   *cancel;
+ 						char		errbuf[256];
+ 
+ 						if ((cancel = PQgetCancel(entry->conn)))
+ 						{
+ 							PQcancel(cancel, errbuf, sizeof(errbuf));
+ 							PQfreeCancel(cancel);
+ 						}
+ 						break;
+ 					}
  					/* If we're aborting, abort all remote transactions too */
  					res = PQexec(entry->conn, "ABORT TRANSACTION");
  					/* Note: can't throw ERROR, it would be infinite loop */
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 32,37 ****
--- 32,38 ----
  #include "optimizer/var.h"
  #include "optimizer/tlist.h"
  #include "parser/parsetree.h"
+ #include "storage/latch.h"
  #include "utils/builtins.h"
  #include "utils/guc.h"
  #include "utils/lsyscache.h"
***************
*** 3131,3136 **** execute_dml_stmt(ForeignScanState *node)
--- 3132,3138 ----
  	ExprContext *econtext = node->ss.ps.ps_ExprContext;
  	int			numParams = dmstate->numParams;
  	const char **values = dmstate->param_values;
+ 	int			wc;
  
  	/*
  	 * Construct array of query parameter values in text format.
***************
*** 3151,3158 **** execute_dml_stmt(ForeignScanState *node)
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
! 								   numParams, NULL, values, NULL, NULL, 0);
  	if (PQresultStatus(dmstate->result) !=
  		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
--- 3153,3188 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
! 						   NULL, values, NULL, NULL, 0))
! 		pgfdw_report_error(ERROR, NULL, dmstate->conn, true, dmstate->query);
! 
! 	/*
! 	 * Receive data until PQgetResult is ready to get the result without
! 	 * blocking.
! 	 */
! 	while (PQisBusy(dmstate->conn))
! 	{
! 		/* Sleep until there's something to do */
! 		wc = WaitLatchOrSocket(MyLatch,
! 							   WL_LATCH_SET | WL_SOCKET_READABLE,
! 							   PQsocket(dmstate->conn),
! 							   -1L);
! 		ResetLatch(MyLatch);
! 
! 		CHECK_FOR_INTERRUPTS();
! 
! 		/* Data available in socket */
! 		if (wc & WL_SOCKET_READABLE)
! 		{
! 			if (!PQconsumeInput(dmstate->conn))
! 				pgfdw_report_error(ERROR, NULL, dmstate->conn, true,
! 								   dmstate->query);
! 		}
! 	}
! 
! 	/* Get the result */
! 	dmstate->result = PQgetResult(dmstate->conn);
  	if (PQresultStatus(dmstate->result) !=
  		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to