On Thu, Oct 13, 2011 at 10:08 AM, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Wed, Oct 12, 2011 at 10:29 PM, Robert Haas <robertmh...@gmail.com> wrote:
>> On Wed, Oct 12, 2011 at 5:45 AM, Fujii Masao <masao.fu...@gmail.com> wrote:
>>> In 9.2dev and 9.1, when walreceiver detects an error while sending data to
>>> WAL stream, it always emits ERROR even if there are data available in the
>>> receive buffer. This might lead to loss of transactions because such
>>> remaining data are not received by walreceiver :(
>>
>> Won't it just reconnect?
>
> Yes if the master is running normally. OTOH, if the master is not running 
> (i.e.,
> failover case), the standby cannot receive again the data which it failed to
> receive.
>
> I found this issue when I shut down the master. When the master shuts down,
> it sends the shutdown checkpoint record, but I found that the standby failed
> to receive it.

Patch attached.

The patch changes walreceiver so that it doesn't emit ERROR just yet even
if it fails to send data to WAL stream. Then, after all available data have been
received and flushed to the disk, it emits ERROR.

If the patch is OK, it should be backported to v9.1.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 49,55 **** static char *recvBuf = NULL;
  static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
! static void libpqrcv_send(const char *buffer, int nbytes);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
--- 49,55 ----
  static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
! static bool libpqrcv_send(const char *buffer, int nbytes);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
***************
*** 404,417 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
  /*
   * Send a message to XLOG stream.
   *
!  * ereports on error.
   */
! static void
  libpqrcv_send(const char *buffer, int nbytes)
  {
  	if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
  		PQflush(streamConn))
! 		ereport(ERROR,
! 				(errmsg("could not send data to WAL stream: %s",
! 						PQerrorMessage(streamConn))));
  }
--- 404,428 ----
  /*
   * Send a message to XLOG stream.
   *
!  * Return true if successfully, false otherwise.
   */
! static bool
  libpqrcv_send(const char *buffer, int nbytes)
  {
+ 	/*
+ 	 * Even if we could not send data to WAL stream, we don't emit ERROR
+ 	 * just yet. There might be still data available in the receive buffer. We
+ 	 * emit ERROR after all available data have been received and flushed to
+ 	 * disk. Otherwise, such outstanding data would be lost.
+ 	 *
+ 	 * XXX: Should the result of PQerrorMessage() be returned so that the
+ 	 * caller can report it? This doesn't seem worthwhile because in most cases,
+ 	 * before reporting that, we will get another error and emit ERROR while
+ 	 * trying to process all available data.
+ 	 */
  	if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
  		PQflush(streamConn))
! 		return false;
! 
! 	return true;
  }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 96,101 **** static struct
--- 96,104 ----
  static StandbyReplyMessage reply_message;
  static StandbyHSFeedbackMessage feedback_message;
  
+ /* Did previous attempt to send data to WAL stream fail? */
+ static bool walrcv_send_error = false;
+ 
  /*
   * About SIGTERM handling:
   *
***************
*** 328,333 **** WalReceiverMain(void)
--- 331,345 ----
  		else
  		{
  			/*
+ 			 * If we didn't receive anything new after we had failed to send data
+ 			 * to WAL stream, we can guarantee that there is no data available in
+ 			 * the receive buffer. So we at last emit ERROR.
+ 			 */
+ 			if (walrcv_send_error)
+ 				ereport(ERROR,
+ 						(errmsg("could not send data to WAL stream")));
+ 
+ 			/*
  			 * We didn't receive anything new, but send a status update to the
  			 * master anyway, to report any progress in applying WAL.
  			 */
***************
*** 627,632 **** XLogWalRcvSendReply(void)
--- 639,651 ----
  									   wal_receiver_status_interval * 1000))
  		return;
  
+ 	/*
+ 	 * If previous attempt to send data to WAL stream has failed, there is
+ 	 * nothing to do here because this attempt would fail again.
+ 	 */
+ 	if (walrcv_send_error)
+ 		return;
+ 
  	/* Construct a new message */
  	reply_message.write = LogstreamResult.Write;
  	reply_message.flush = LogstreamResult.Flush;
***************
*** 641,647 **** XLogWalRcvSendReply(void)
  	/* Prepend with the message type and send it. */
  	buf[0] = 'r';
  	memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
! 	walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
  }
  
  /*
--- 660,666 ----
  	/* Prepend with the message type and send it. */
  	buf[0] = 'r';
  	memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
! 	walrcv_send_error = !walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
  }
  
  /*
***************
*** 682,687 **** XLogWalRcvSendHSFeedback(void)
--- 701,714 ----
  		return;
  
  	/*
+ 	 * If previous attempt to send data to WAL stream has failed, there is
+ 	 * nothing to do here because this attempt would fail again. Check this
+ 	 * after the interval has expired to reduce number of calls.
+ 	 */
+ 	if (walrcv_send_error)
+ 		return;
+ 
+ 	/*
  	 * Make the expensive call to get the oldest xmin once we are certain
  	 * everything else has been checked.
  	 */
***************
*** 709,713 **** XLogWalRcvSendHSFeedback(void)
  	/* Prepend with the message type and send it. */
  	buf[0] = 'h';
  	memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
! 	walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
  }
--- 736,740 ----
  	/* Prepend with the message type and send it. */
  	buf[0] = 'h';
  	memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
! 	walrcv_send_error = !walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
  }
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 96,102 **** typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
! typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
  extern PGDLLIMPORT walrcv_send_type walrcv_send;
  
  typedef void (*walrcv_disconnect_type) (void);
--- 96,102 ----
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
! typedef bool (*walrcv_send_type) (const char *buffer, int nbytes);
  extern PGDLLIMPORT walrcv_send_type walrcv_send;
  
  typedef void (*walrcv_disconnect_type) (void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to