On 2016/03/31 16:38, Etsuro Fujita wrote:
On 2016/03/31 14:07, Noah Misch wrote:
On Thu, Mar 24, 2016 at 01:02:57PM +0900, Etsuro Fujita wrote:
On 2016/03/24 11:14, Michael Paquier wrote:
On Wed, Mar 23, 2016 at 10:05 PM, Thom Brown <t...@linux.com> wrote:
I've noticed that you now can't cancel a query if there's DML pushdown
to a foreign server.  This previously worked while it was sending
individual statements as it interrupted and rolled it back.

Here's what the local server sees when trying to cancel:

# DELETE FROM remote.contacts;
^CCancel request sent
DELETE 5000000

This should probably be fixed.

Looking at what has been committed, execute_dml_stmt is using
PQexecParams, so we'd want to use an asynchronous call and loop on
PQgetResult with CHECK_FOR_INTERRUPTS() in it.

Will fix.

[This is a generic notification.]

Sorry for not having taken any action.  I've been busy with another task
lately, but I started working on this.  I plan to post a patch early
next week.

Here is a patch to fix this issue. As proposed by Michael, I modified execute_dml_stmt so that it uses PQsendQueryParams, not PQexecParams. Any comments are welcome.

Best regards,
Etsuro Fujita
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 12,17 ****
--- 12,20 ----
   */
  #include "postgres.h"
  
+ #include <unistd.h>
+ #include <sys/time.h>
+ 
  #include "postgres_fdw.h"
  
  #include "access/xact.h"
***************
*** 20,25 ****
--- 23,38 ----
  #include "utils/hsearch.h"
  #include "utils/memutils.h"
  
+ #ifdef HAVE_POLL_H
+ #include <poll.h>
+ #endif
+ #ifdef HAVE_SYS_POLL_H
+ #include <sys/poll.h>
+ #endif
+ #ifdef HAVE_SYS_SELECT_H
+ #include <sys/select.h>
+ #endif
+ 
  
  /*
   * Connection cache hash table entry
***************
*** 417,422 **** ReleaseConnection(PGconn *conn)
--- 430,484 ----
  }
  
  /*
+  * Wait until we can read data.
+  *
+  * Returns true if data has become available for reading, false if interrupted
+  * by signal.
+  *
+  * This is based on pqSocketCheck.
+  */
+ bool
+ CheckSocket(PGconn *conn)
+ {
+ 	int			ret;
+ 
+ 	Assert(conn != NULL);
+ 	if (PQsocket(conn) < 0)
+ 		ereport(ERROR,
+ 				(errcode_for_socket_access(),
+ 				 errmsg("invalid socket: %s", PQerrorMessage(conn))));
+ 
+ 	/* We use poll(2) if available, otherwise select(2) */
+ 	{
+ #ifdef HAVE_POLL
+ 		struct pollfd input_fd;
+ 
+ 		input_fd.fd = PQsocket(conn);
+ 		input_fd.events = POLLIN | POLLERR;
+ 		input_fd.revents = 0;
+ 
+ 		ret = poll(&input_fd, 1, -1);
+ #else							/* !HAVE_POLL */
+ 		fd_set		input_mask;
+ 
+ 		FD_ZERO(&input_mask);
+ 		FD_SET(PQsocket(conn), &input_mask);
+ 
+ 		ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, NULL);
+ #endif   /* HAVE_POLL */
+ 	}
+ 
+ 	Assert(ret != 0);
+ 	if (ret < 0 && errno == EINTR)
+ 		return false;
+ 	if (ret < 0)
+ 		ereport(ERROR,
+ 				(errcode_for_socket_access(),
+ 				 errmsg("select() failed: %s", strerror(errno))));
+ 	return true;
+ }
+ 
+ /*
   * Assign a "unique" number for a cursor.
   *
   * These really only need to be unique per connection within a transaction.
***************
*** 598,603 **** pgfdw_xact_callback(XactEvent event, void *arg)
--- 660,684 ----
  				case XACT_EVENT_ABORT:
  					/* Assume we might have lost track of prepared statements */
  					entry->have_error = true;
+ 					/*
+ 					 * If we executed a query asynchronously, the query might
+ 					 * have not yet completed.  Check to see if a command is
+ 					 * still being processed by the remote server, and if so,
+ 					 * request cancellation of the command; if not, abort
+ 					 * gracefully.
+ 					 */
+ 					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ 					{
+ 						PGcancel   *cancel;
+ 						char		errbuf[256];
+ 
+ 						if ((cancel = PQgetCancel(entry->conn)))
+ 						{
+ 							PQcancel(cancel, errbuf, sizeof(errbuf));
+ 							PQfreeCancel(cancel);
+ 						}
+ 						break;
+ 					}
  					/* If we're aborting, abort all remote transactions too */
  					res = PQexec(entry->conn, "ABORT TRANSACTION");
  					/* Note: can't throw ERROR, it would be infinite loop */
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 3151,3158 **** execute_dml_stmt(ForeignScanState *node)
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
! 								   numParams, NULL, values, NULL, NULL, 0);
  	if (PQresultStatus(dmstate->result) !=
  		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
--- 3151,3181 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
! 						   NULL, values, NULL, NULL, 0))
! 		pgfdw_report_error(ERROR, NULL, dmstate->conn, true, dmstate->query);
! 
! 	/*
! 	 * Receive data until PQgetResult is ready to get the result without
! 	 * blocking.
! 	 */
! 	while (PQisBusy(dmstate->conn))
! 	{
! 		/* Wait until data is available. */
! 		if (!CheckSocket(dmstate->conn))
! 		{
! 			/* We are interrupted by signal. */
! 			CHECK_FOR_INTERRUPTS();
! 			continue;		
! 		}
! 		/* Data is available. */
! 		if (!PQconsumeInput(dmstate->conn))
! 			pgfdw_report_error(ERROR, NULL, dmstate->conn, true,
! 							   dmstate->query);
! 	}
! 
! 	/* Get the result */
! 	dmstate->result = PQgetResult(dmstate->conn);
  	if (PQresultStatus(dmstate->result) !=
  		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
  		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
*** a/contrib/postgres_fdw/postgres_fdw.h
--- b/contrib/postgres_fdw/postgres_fdw.h
***************
*** 101,106 **** extern void reset_transmission_modes(int nestlevel);
--- 101,107 ----
  /* in connection.c */
  extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
  extern void ReleaseConnection(PGconn *conn);
+ extern bool CheckSocket(PGconn *conn);
  extern unsigned int GetCursorNumber(PGconn *conn);
  extern unsigned int GetPrepStmtNumber(PGconn *conn);
  extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to