On 2016/03/31 16:38, Etsuro Fujita wrote:
On 2016/03/31 14:07, Noah Misch wrote:
On Thu, Mar 24, 2016 at 01:02:57PM +0900, Etsuro Fujita wrote:
On 2016/03/24 11:14, Michael Paquier wrote:
On Wed, Mar 23, 2016 at 10:05 PM, Thom Brown <t...@linux.com> wrote:
I've noticed that you now can't cancel a query if there's DML pushdown
to a foreign server. This previously worked while it was sending
individual statements as it interrupted and rolled it back.
Here's what the local server sees when trying to cancel:
# DELETE FROM remote.contacts;
^CCancel request sent
DELETE 5000000
This should probably be fixed.
Looking at what has been committed, execute_dml_stmt is using
PQexecParams, so we'd want to use an asynchronous call and loop on
PQgetResult with CHECK_FOR_INTERRUPTS() in it.
Will fix.
[This is a generic notification.]
Sorry for not having taken any action. I've been busy with another task
lately, but I started working on this. I plan to post a patch early
next week.
Here is a patch to fix this issue. As proposed by Michael, I modified
execute_dml_stmt so that it uses PQsendQueryParams, not PQexecParams.
Any comments are welcome.
Best regards,
Etsuro Fujita
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 12,17 ****
--- 12,20 ----
*/
#include "postgres.h"
+ #include <unistd.h>
+ #include <sys/time.h>
+
#include "postgres_fdw.h"
#include "access/xact.h"
***************
*** 20,25 ****
--- 23,38 ----
#include "utils/hsearch.h"
#include "utils/memutils.h"
+ #ifdef HAVE_POLL_H
+ #include <poll.h>
+ #endif
+ #ifdef HAVE_SYS_POLL_H
+ #include <sys/poll.h>
+ #endif
+ #ifdef HAVE_SYS_SELECT_H
+ #include <sys/select.h>
+ #endif
+
/*
* Connection cache hash table entry
***************
*** 417,422 **** ReleaseConnection(PGconn *conn)
--- 430,484 ----
}
/*
+ * Wait until we can read data.
+ *
+ * Returns true if data has become available for reading, false if interrupted
+ * by signal.
+ *
+ * This is based on pqSocketCheck.
+ */
+ bool
+ CheckSocket(PGconn *conn)
+ {
+ int ret;
+
+ Assert(conn != NULL);
+ if (PQsocket(conn) < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("invalid socket: %s", PQerrorMessage(conn))));
+
+ /* We use poll(2) if available, otherwise select(2) */
+ {
+ #ifdef HAVE_POLL
+ struct pollfd input_fd;
+
+ input_fd.fd = PQsocket(conn);
+ input_fd.events = POLLIN | POLLERR;
+ input_fd.revents = 0;
+
+ ret = poll(&input_fd, 1, -1);
+ #else /* !HAVE_POLL */
+ fd_set input_mask;
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(conn), &input_mask);
+
+ ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, NULL);
+ #endif /* HAVE_POLL */
+ }
+
+ Assert(ret != 0);
+ if (ret < 0 && errno == EINTR)
+ return false;
+ if (ret < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("select() failed: %s", strerror(errno))));
+ return true;
+ }
+
+ /*
* Assign a "unique" number for a cursor.
*
* These really only need to be unique per connection within a transaction.
***************
*** 598,603 **** pgfdw_xact_callback(XactEvent event, void *arg)
--- 660,684 ----
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
+ /*
+ * If we executed a query asynchronously, the query might
+ * have not yet completed. Check to see if a command is
+ * still being processed by the remote server, and if so,
+ * request cancellation of the command; if not, abort
+ * gracefully.
+ */
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ {
+ PGcancel *cancel;
+ char errbuf[256];
+
+ if ((cancel = PQgetCancel(entry->conn)))
+ {
+ PQcancel(cancel, errbuf, sizeof(errbuf));
+ PQfreeCancel(cancel);
+ }
+ break;
+ }
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 3151,3158 **** execute_dml_stmt(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
! dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
! numParams, NULL, values, NULL, NULL, 0);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
--- 3151,3181 ----
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
! if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
! NULL, values, NULL, NULL, 0))
! pgfdw_report_error(ERROR, NULL, dmstate->conn, true, dmstate->query);
!
! /*
! * Receive data until PQgetResult is ready to get the result without
! * blocking.
! */
! while (PQisBusy(dmstate->conn))
! {
! /* Wait until data is available. */
! if (!CheckSocket(dmstate->conn))
! {
! /* We are interrupted by signal. */
! CHECK_FOR_INTERRUPTS();
! continue;
! }
! /* Data is available. */
! if (!PQconsumeInput(dmstate->conn))
! pgfdw_report_error(ERROR, NULL, dmstate->conn, true,
! dmstate->query);
! }
!
! /* Get the result */
! dmstate->result = PQgetResult(dmstate->conn);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
*** a/contrib/postgres_fdw/postgres_fdw.h
--- b/contrib/postgres_fdw/postgres_fdw.h
***************
*** 101,106 **** extern void reset_transmission_modes(int nestlevel);
--- 101,107 ----
/* in connection.c */
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
extern void ReleaseConnection(PGconn *conn);
+ extern bool CheckSocket(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers