Heikki Linnakangas wrote:
> Magnus Hagander wrote:
>> 2010/1/17 Heikki Linnakangas <heikki.linnakan...@enterprisedb.com>:
>>> We could replace the blocking PQexec() calls with PQsendQuery(), and use
>>>  the emulated version of select() to wait.
>> Hmm. That would at least theoretically work, but aren't there still
>> places we may end up blocking further down? Or are those ok?
> 
> There's also PQconnect that needs similar treatment (using
> PQconnectStart/Poll()), but that's it.

So here's a patch implementing that for contrib/dblink. Walreceiver
needs the same treatment.

The implementation should be shared between the two, but I'm not sure
how. We can't just put the wrapper functions to a module in
src/backend/port/, because the wrapper functions depend on libpq. Maybe
put them in a new header file as static functions, and include that in
contrib/dblink/dblink.c and src/backend/replication/libpqwalreceiver.c.

-- 
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 2c1d7a2..fa11709 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -34,6 +34,14 @@
 
 #include <limits.h>
 
+#ifdef WIN23
+/* These are needed by the interruptible libpq function replacements */
+#include <time.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#endif
+
 #include "libpq-fe.h"
 #include "fmgr.h"
 #include "funcapi.h"
@@ -101,6 +109,193 @@ static void dblink_res_error(const char *conname, PGresult *res, const char *dbl
 static char *get_connect_string(const char *servername);
 static char *escape_param_str(const char *from);
 
+#ifdef WIN23
+/*
+ * Replacement functions for blocking libpq functions, for Windows.
+ *
+ * On Windows, the vanilla select() function doesn't react to our emulated
+ * signals. PQexec() and PQconnectdb() use select(), so they're
+ * uninterruptible. These replacement functions use the corresponding
+ * asynchronous libpq functions and backend version of select() to implement
+ * the same functionality, but in a way that's interrupted by signals.
+ *
+ * These work on other platforms as well, but presumably it's more efficient
+ * to let libpq block.
+ */
+
+static PGresult *
+dblink_PQexec(PGconn *conn, const char *command)
+{
+	int			sock;
+	PGresult   *result,
+			   *lastResult;
+
+	/* Send query. This can block too, but we ignore that for now. */
+	if (PQsendQuery(conn, command) == 0)
+		return NULL;
+
+	/* Wait for response */
+	sock = PQsocket(conn);
+
+	while(PQisBusy(conn))
+	{
+		fd_set		input_mask;
+
+		FD_ZERO(&input_mask);
+
+		FD_SET		(sock, &input_mask);
+
+		/*
+		 * Note that we don't check the return code. We assume that
+		 * PQconsumeInput() will get the same error, and set the result
+		 * as failed.
+		 */
+		select(sock + 1, &input_mask, NULL, NULL, NULL);
+		PQconsumeInput(conn);
+	}
+
+	/*
+	 * Emulate PQexec()'s behavior of returning the *last* result, if
+	 * there's many. dblink doesn't normally issue statements that return
+	 * multiple results, but the user-supplied SQL statement passed to
+	 * dblink() might. You'll only get the last result back, so it's not a
+	 * very sensible thing to do, but we must still handle it gracefully.
+	 *
+	 * We don't try to concatenate error messages like PQexec() does.
+	 * Doesn't seem worth the effort.
+	 */
+	lastResult = NULL;
+	while((result = PQgetResult(conn)) != NULL)
+	{
+		if (lastResult != NULL)
+		{
+			if (PQresultStatus(lastResult) != PGRES_COMMAND_OK &&
+				PQresultStatus(lastResult) != PGRES_TUPLES_OK)
+			{
+				PQclear(result);
+				result = lastResult;
+			}
+			else
+				PQclear(lastResult);
+		}
+		lastResult = result;
+	}
+
+	return lastResult;
+}
+
+static PGconn *
+dblink_PQconnectdb(const char *conninfo)
+{
+	PGconn *conn;
+	PostgresPollingStatusType status;
+	PQconninfoOption *options;
+	int timeout_secs = 0;
+	time_t end_time;
+	int sock;
+
+	conn = PQconnectStart(conninfo);
+	if (conn == NULL)
+		return NULL;
+
+	if (PQstatus(conn) == CONNECTION_BAD)
+		return conn;
+
+	/* Extract timeout from the connection string */
+	options = PQconninfoParse(conninfo, NULL);
+	if (options)
+	{
+		PQconninfoOption *option;
+		for (option = options; option->keyword != NULL; option++)
+		{
+			if (strcmp(option->keyword, "connect_timeout") == 0)
+			{
+				if (option->val != NULL && option->val[0] != '\0')
+				{
+					timeout_secs = atoi(option->val);
+					break;
+				}
+			}
+		}
+		PQconninfoFree(options);
+	}
+	if (timeout_secs > 0)
+		end_time = time(NULL) + timeout_secs;
+
+	sock = PQsocket(conn);
+
+	/* Wait for connection to be established */
+	for (;;)
+	{
+		fd_set	input_mask;
+		fd_set	output_mask;
+		time_t	now;
+		struct timeval timeout;
+		struct timeval *timeout_ptr;
+
+		FD_ZERO(&input_mask);
+		FD_ZERO(&output_mask);
+
+		status = PQconnectPoll(conn);
+		switch(status)
+		{
+			case PGRES_POLLING_OK:
+			case PGRES_POLLING_FAILED:
+				return conn;
+
+			case PGRES_POLLING_READING:
+				FD_SET(sock, &input_mask);
+				break;
+
+			case PGRES_POLLING_WRITING:
+				FD_SET(sock, &output_mask);
+				break;
+
+			default:
+				elog(ERROR, "unknown PQconnectPoll() return value: %d", status);
+		}
+
+		if (timeout_secs > 0)
+		{
+			now = time(NULL);
+			timeout.tv_sec = (now > end_time) ? 0 : (end_time - now);
+			timeout.tv_usec = 0;
+			timeout_ptr = &timeout;
+		}
+		else
+			timeout_ptr = NULL;
+
+		/*
+		 * Note that we don't check an error code. We assume that
+		 * PQconnectPoll() will get the same error, and return failure.
+		 */
+		if (select(sock + 1, &input_mask, &output_mask, NULL, timeout_ptr) == 0)
+		{
+			/* Timeout */
+			PQfinish(conn);
+
+			/*
+			 * This message is subtly different from the one from the message
+			 * you get on other platforms, where PQconnectdb() handles the
+			 * timeout. The "timeout expired" message here gets translated
+			 * using the backend .po file, while the message emitted by
+			 * PQconnectdb() is translated using libpq .po file. I hope that
+			 * makes no difference in practice.
+			 */
+			ereport(ERROR,
+					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+					 errmsg("could not establish connection"),
+					 errdetail("timeout expired")));
+		}
+	}
+	return NULL; /* not reached, keep compiler quiet */
+}
+
+#define PQexec(conn, command) dblink_PQexec(conn, command)
+#define PQconnectdb(conninfo) dblink_PQconnectdb(conninfo)
+
+#endif
+
 /* Global */
 static remoteConn *pconn = NULL;
 static HTAB *remoteConnHash = NULL;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to