Hi,

http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php
On win32, the blocking libpq functions like PQconnectdb() and
PQexec() are uninterruptible since they use the vanilla select()
instead of our signal emulation layer compatible select().
Nevertheless, currently walreceiver uses them to establish a
connection, send a handshake message and wait for the reply.
So walreceiver also becomes uninterruptible for a while. This
is the must-fix problem for 9.0.

I replaced the blocking libpq functions currently used with
asynchronous ones, and used the emulated version of select()
to wait, to make walreceiver interruptible. Here is the patch.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 18,23 ****
--- 18,24 ----
  
  #include <unistd.h>
  #include <sys/time.h>
+ #include <time.h>
  
  #include "libpq-fe.h"
  #include "access/xlog.h"
***************
*** 53,59 **** static bool libpqrcv_receive(int timeout, unsigned char *type,
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
! static bool libpq_select(int timeout_ms);
  
  /*
   * Module load callback
--- 54,61 ----
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
! static int libpq_select(bool forRead, bool forWrite, int timeout_ms);
! static PGresult *libpqrcv_PQexec(const char *query);
  
  /*
   * Module load callback
***************
*** 83,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	TimeLineID	standby_tli;
  	PGresult   *res;
  	char		cmd[64];
  
  	/* Connect */
  	snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
  
! 	streamConn = PQconnectdb(conninfo_repl);
! 	if (PQstatus(streamConn) != CONNECTION_OK)
  		ereport(ERROR,
  				(errmsg("could not connect to the primary server : %s",
  						PQerrorMessage(streamConn))));
  
  	/*
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = PQexec(streamConn, "IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
--- 85,198 ----
  	TimeLineID	standby_tli;
  	PGresult   *res;
  	char		cmd[64];
+ 	PQconninfoOption   *options;
+ 	time_t		finish_time = ((time_t) -1);
+ 
+ 	/*
+ 	 * Extract timeout from the connection string
+ 	 */
+ 	options = PQconninfoParse(conninfo, NULL);
+ 	if (options)
+ 	{
+ 		PQconninfoOption *option;
+ 		for (option = options; option->keyword != NULL; option++)
+ 		{
+ 			if (strcmp(option->keyword, "connect_timeout") == 0)
+ 			{
+ 				if (option->val != NULL && option->val[0] != '\0')
+ 				{
+ 					int		timeout = atoi(option->val);
+ 
+ 					if (timeout > 0)
+ 					{
+ 						/*
+ 						 * Rounding could cause connection to fail;
+ 						 * need at least 2 secs
+ 						 */
+ 						if (timeout < 2)
+ 							timeout = 2;
+ 						/* calculate the finish time based on start + timeout */
+ 						finish_time = time(NULL) + timeout;
+ 					}
+ 				}
+ 			}
+ 		}
+ 		PQconninfoFree(options);
+ 	}
  
  	/* Connect */
  	snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
  
! 	streamConn = PQconnectStart(conninfo_repl);
! 	if (PQstatus(streamConn) == CONNECTION_BAD)
  		ereport(ERROR,
  				(errmsg("could not connect to the primary server : %s",
  						PQerrorMessage(streamConn))));
  
  	/*
+ 	 * Wait for connection to be established
+ 	 */
+ 	for (;;)
+ 	{
+ 		PostgresPollingStatusType	status;
+ 		bool		established = false;
+ 		bool		forRead = false;
+ 		bool		forWrite = false;
+ 		int		timeout_ms;
+ 		int		ret;
+ 
+ 		status = PQconnectPoll(streamConn);
+ 		switch (status)
+ 		{
+ 			case PGRES_POLLING_READING:
+ 				forRead = true;
+ 				break;
+ 			case PGRES_POLLING_WRITING:
+ 				forWrite = true;
+ 				break;
+ 			case PGRES_POLLING_OK:
+ 				established = true;
+ 				break;
+ 			case PGRES_POLLING_FAILED:
+ 			default:
+ 				ereport(ERROR,
+ 						(errmsg("could not connect to the primary server : %s",
+ 								PQerrorMessage(streamConn))));
+ 		}
+ 
+ 		if (established)
+ 			break;
+ 
+ 	retry:
+ 		/* Compute appropriate timeout interval */
+ 		if (finish_time == ((time_t) -1))
+ 			timeout_ms = -1;
+ 		else
+ 		{
+ 			time_t		now = time(NULL);
+ 
+ 			if (finish_time > now)
+ 				timeout_ms = (finish_time - now) * 1000;
+ 			else
+ 				timeout_ms = 0;
+ 		}
+ 
+ 		/*
+ 		 * Wait until we can read or write the connection socket
+ 		 */
+ 		ret = libpq_select(forRead, forWrite, timeout_ms);
+ 		if (ret == 0)	/* timeout */
+ 			ereport(ERROR,
+ 					(errmsg("could not connect to the primary server : timeout expired")));
+ 		if (ret < 0) /* interrupted */
+ 			goto retry;
+ 	}
+ 
+ 	/*
  	 * Get the system identifier and timeline ID as a DataRow message from the
  	 * primary server.
  	 */
! 	res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
  	if (PQresultStatus(res) != PGRES_TUPLES_OK)
  	{
  		PQclear(res);
***************
*** 149,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = PQexec(streamConn, cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
  		ereport(ERROR,
  				(errmsg("could not start XLOG streaming: %s",
  						PQerrorMessage(streamConn))));
  	PQclear(res);
  
  	justconnected = true;
--- 244,257 ----
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
! 	res = libpqrcv_PQexec(cmd);
  	if (PQresultStatus(res) != PGRES_COPY_OUT)
+ 	{
+ 		PQclear(res);
  		ereport(ERROR,
  				(errmsg("could not start XLOG streaming: %s",
  						PQerrorMessage(streamConn))));
+ 	}
  	PQclear(res);
  
  	justconnected = true;
***************
*** 162,176 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  }
  
  /*
!  * Wait until we can read WAL stream, or timeout.
   *
!  * Returns true if data has become available for reading, false if timed out
!  * or interrupted by signal.
   *
   * This is based on pqSocketCheck.
   */
! static bool
! libpq_select(int timeout_ms)
  {
  	int			ret;
  
--- 260,275 ----
  }
  
  /*
!  * Wait until we can read or write the connection socket
   *
!  * Returns >0 if data has been ready to be read, written,
!  * or both, 0 if timed out, -1 if interrupted by signal.
!  * Throws an error if an error occurred.
   *
   * This is based on pqSocketCheck.
   */
! static int
! libpq_select(bool forRead, bool forWrite, int timeout_ms)
  {
  	int			ret;
  
***************
*** 180,203 **** libpq_select(int timeout_ms)
  				(errcode_for_socket_access(),
  				 errmsg("socket not open")));
  
  	/* We use poll(2) if available, otherwise select(2) */
  	{
  #ifdef HAVE_POLL
  		struct pollfd input_fd;
  
  		input_fd.fd = PQsocket(streamConn);
! 		input_fd.events = POLLIN | POLLERR;
  		input_fd.revents = 0;
  
  		ret = poll(&input_fd, 1, timeout_ms);
  #else							/* !HAVE_POLL */
  
  		fd_set		input_mask;
  		struct timeval timeout;
  		struct timeval *ptr_timeout;
  
  		FD_ZERO(&input_mask);
! 		FD_SET		(PQsocket(streamConn), &input_mask);
  
  		if (timeout_ms < 0)
  			ptr_timeout = NULL;
--- 279,314 ----
  				(errcode_for_socket_access(),
  				 errmsg("socket not open")));
  
+ 	Assert(forRead || forWrite);
+ 
  	/* We use poll(2) if available, otherwise select(2) */
  	{
  #ifdef HAVE_POLL
  		struct pollfd input_fd;
  
  		input_fd.fd = PQsocket(streamConn);
! 		input_fd.events = POLLERR;
  		input_fd.revents = 0;
  
+ 		if (forRead)
+ 			input_fd.events |= POLLIN;
+ 		if (forWrite)
+ 			input_fd.events |= POLLOUT;
+ 
  		ret = poll(&input_fd, 1, timeout_ms);
  #else							/* !HAVE_POLL */
  
  		fd_set		input_mask;
+ 		fd_set		output_mask;
  		struct timeval timeout;
  		struct timeval *ptr_timeout;
  
  		FD_ZERO(&input_mask);
! 		FD_ZERO(&output_mask);
! 		if (forRead)
! 			FD_SET		(PQsocket(streamConn), &input_mask);
! 		if (forWrite)
! 			FD_SET		(PQsocket(streamConn), &output_mask);
  
  		if (timeout_ms < 0)
  			ptr_timeout = NULL;
***************
*** 209,225 **** libpq_select(int timeout_ms)
  		}
  
  		ret = select(PQsocket(streamConn) + 1, &input_mask,
! 					 NULL, NULL, ptr_timeout);
  #endif   /* HAVE_POLL */
  	}
  
! 	if (ret == 0 || (ret < 0 && errno == EINTR))
! 		return false;
  	if (ret < 0)
  		ereport(ERROR,
  				(errcode_for_socket_access(),
  				 errmsg("select() failed: %m")));
! 	return true;
  }
  
  /*
--- 320,397 ----
  		}
  
  		ret = select(PQsocket(streamConn) + 1, &input_mask,
! 					 &output_mask, NULL, ptr_timeout);
  #endif   /* HAVE_POLL */
  	}
  
! 	if (ret == 0)	/* timeout */
! 		return 0;
! 	if (ret < 0 && errno == EINTR)	/* interrupted */
! 		return -1;
  	if (ret < 0)
  		ereport(ERROR,
  				(errcode_for_socket_access(),
  				 errmsg("select() failed: %m")));
! 	return ret;
! }
! 
! /*
!  * Send a query and wait for the result by using the asynchronous libpq
!  * functions and the backend version of select().
!  *
!  * On Windows, walreceiver must use this function instead of the blocking
!  * libpq functions like PQexec() since they use the vanilla select() and
!  * are uninterruptible by our emulated signals. On the other hand, this
!  * function is interruptible since it uses the signal emulation layer
!  * compatible select().
!  */
! static PGresult *
! libpqrcv_PQexec(const char *query)
! {
! 	PGresult   *res = NULL;
! 
! 	/*
! 	 * Submit a query. Since we don't use non-blocking mode, this also
! 	 * can block. But its risk is relatively small, so we ignore that
! 	 * for now.
! 	 */
! 	if (!PQsendQuery(streamConn, query))
! 		return NULL;
! 
! 	for (;;)
! 	{
! 		PGresult   *next;
! 
! 		/*
! 		 * Receive data until PQgetResult has been ready to get the
! 		 * result without blocking.
! 		 */
! 		while (PQisBusy(streamConn))
! 		{
! 			if (libpq_select(true, false, -1) < 0)
! 				continue;			/* interrupted */
! 			if (PQconsumeInput(streamConn) == 0)
! 				return NULL;		/* trouble */
! 		}
! 
! 		/*
! 		 * Don't emulate the PQexec()'s behavior of returning the last
! 		 * result, if there's many, since walreceiver never sends a query
! 		 * returning multiple results.
! 		 */
! 		if ((next = PQgetResult(streamConn)) == NULL)
! 			break;				/* query is complete */
! 		if (PQresultStatus(next) == PGRES_FATAL_ERROR)
! 			return next;
! 		PQclear(res);
! 		res = next;
! 		if (PQresultStatus(res) == PGRES_COPY_IN ||
! 			PQresultStatus(res) == PGRES_COPY_OUT ||
! 			PQstatus(streamConn) == CONNECTION_BAD)
! 			break;
! 	}
! 
! 	return res;
  }
  
  /*
***************
*** 268,274 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
  	 */
  	if (timeout > 0 && !justconnected)
  	{
! 		if (!libpq_select(timeout))
  			return false;
  
  		if (PQconsumeInput(streamConn) == 0)
--- 440,446 ----
  	 */
  	if (timeout > 0 && !justconnected)
  	{
! 		if (libpq_select(true, false, timeout) <= 0)
  			return false;
  
  		if (PQconsumeInput(streamConn) == 0)
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to