On 04.10.2012 13:12, Amit kapila wrote:
Following changes are done to support replication timeout in sender as well as 
receiver:

1. One new configuration parameter wal_receiver_timeout is added to detect 
timeout at receiver task.
2. Existing parameter replication_timeout is renamed to wal_sender_timeout.

Ok. The other option would be to have just one GUC, I'm open to bikeshedding on this one. On one hand, there's no reason the timeouts have to the same, so it would be nice to have separate settings, but on the other hand, I can't imagine a case where a single setting wouldn't work just as well.

3. Now PrimaryKeepaliveMessage structure is modified to add one more field to 
indicate whether keep-alive is of type 'r' (i.e.
     reply) or 'h' (i.e. heart-beat).
4. Now the keep-alive message from sender will be sent to standby if it was 
idle for more than or equal to half of wal_sender_timeout.
     In this case it will send keep-alive of type 'h'.
5. Once the standby receiver a keep-alive, it needs to send an immediate reply 
to primary to indicate connection is alive.
6. Now Reply message to send wal offset and Feedback message to send oldest 
transaction are merged into single Reply message.
     So now the structure StandbyReplyMessage is changed to add two more fields 
as xmin and epoch. Also StandbyHSFeedbackMessage
     structure is changed to remove xmin and epoch fields (as these are moved 
to StandbyReplyMessage).
7. Because of changes as in step-6, once receiver task receives some data from 
primary then it will only send Reply Message.

Oh I see. That's not what I meant by combining the keep-alive and hs feedback messages, I imagined that the hearbeats would *also* use the same message type. Ie. there would be only a single message type from standby to primary, used for:

1. updating the receive/apply pointer
2. HS feedback
3. for pinging the server when wal_receiver_timeout is approaching
4. to reply to to pings from the server.

Since we didn't quite achieve that, it seems best leave out this merging of reply and HS feedback message types, to keep the patch small. We might still want to do that, but better do that as a separate patch.

8. Same Reply message is sent in step-5 and step-7 but incase of step-5, then 
reply is sent immediately but incase of step-7, reply is sent
      if wal_receiver_status_interval has lapsed (this part is same as earlier).
9. Similar to sender, if receiver finds itself idle for more than or equal to 
half of configured wal_receiver_timeout, then it will send the
      hot-standby heartbeat. This heart-beat has been modified to send only 
sendTime.
10. Once sender task receiver heart-beat message from standby then it sends 
back the reply immediately. In this keep-alive message is
        sent of type 'r'.
11. If even after wal_sender_timeout no message received from standby then it 
will be considered as network break at sender task.
12. If even after wal_receiver_timeout no message received from primary then it 
will be considered as network break at receiver task.

Attached is an updated patch. I reverted the merging of message types and fixed a bunch of cosmetic issues. There was one bug: in the main loop of walreceiver, you send the "ping" message on every wakeup after enough time has passed since last reception. That means that if the server doesn't reply promptly, you send a new ping message every 100 ms (NAPTIME_PER_CYCLE), until it gets a reply. Walsender had the same issue, but it was not quite as sever there because the naptime was longer. Fixed that.

How does this look now?

- Heikki
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2236,2245 **** include 'filename'
         </listitem>
        </varlistentry>
  
!      <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
!       <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
        <indexterm>
!        <primary><varname>replication_timeout</> configuration parameter</primary>
        </indexterm>
        <listitem>
         <para>
--- 2236,2245 ----
         </listitem>
        </varlistentry>
  
!      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
!       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)</term>
        <indexterm>
!        <primary><varname>wal_sender_timeout</> configuration parameter</primary>
        </indexterm>
        <listitem>
         <para>
***************
*** 2251,2262 **** include 'filename'
          the <filename>postgresql.conf</> file or on the server command line.
          The default value is 60 seconds.
         </para>
-        <para>
-         To prevent connections from being terminated prematurely,
-         <xref linkend="guc-wal-receiver-status-interval">
-         must be enabled on the standby, and its value must be less than the
-         value of <varname>replication_timeout</>.
-        </para>
        </listitem>
       </varlistentry>
  
--- 2251,2256 ----
***************
*** 2474,2484 **** include 'filename'
         the <filename>postgresql.conf</> file or on the server command line.
         The default value is 10 seconds.
        </para>
-       <para>
-        When <xref linkend="guc-replication-timeout"> is enabled on a sending server,
-        <varname>wal_receiver_status_interval</> must be enabled, and its value
-        must be less than the value of <varname>replication_timeout</>.
-       </para>
        </listitem>
       </varlistentry>
  
--- 2468,2473 ----
***************
*** 2507,2512 **** include 'filename'
--- 2496,2520 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-wal-receiver-timeout" xreflabel="wal_receiver_timeout">
+       <term><varname>wal_receiver_timeout</varname> (<type>integer</type>)</term>
+       <indexterm>
+        <primary><varname>wal_receiver_timeout</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Terminate replication connections that are inactive longer
+         than the specified number of milliseconds. This is useful for
+         the receiving standby server to detect a primary node crash or network
+         outage.
+         A value of zero disables the timeout mechanism.  This parameter
+         can only be set in
+         the <filename>postgresql.conf</> file or on the server command line.
+         The default value is 60 seconds.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
       </variablelist>
      </sect2>
     </sect1>
*** a/doc/src/sgml/release-9.1.sgml
--- b/doc/src/sgml/release-9.1.sgml
***************
*** 3322,3328 ****
       <listitem>
        <para>
         Add
!        <link linkend="guc-replication-timeout"><varname>replication_timeout</></link>
         setting (Fujii Masao, Heikki Linnakangas)
        </para>
  
--- 3322,3328 ----
       <listitem>
        <para>
         Add
!        <varname>replication_timeout</>
         setting (Fujii Masao, Heikki Linnakangas)
        </para>
  
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 55,60 ****
--- 55,61 ----
  
  /* GUC variables */
  int			wal_receiver_status_interval;
+ int			wal_receiver_timeout;
  bool		hot_standby_feedback;
  
  /* libpqreceiver hooks to these when loaded */
***************
*** 121,127 **** static void WalRcvDie(int code, Datum arg);
  static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
  static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
  static void XLogWalRcvFlush(bool dying);
! static void XLogWalRcvSendReply(void);
  static void XLogWalRcvSendHSFeedback(void);
  static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
  
--- 122,128 ----
  static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
  static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
  static void XLogWalRcvFlush(bool dying);
! static void XLogWalRcvSendReply(bool force, bool requestReply);
  static void XLogWalRcvSendHSFeedback(void);
  static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
  
***************
*** 170,178 **** WalReceiverMain(void)
  {
  	char		conninfo[MAXCONNINFO];
  	XLogRecPtr	startpoint;
- 
  	/* use volatile pointer to prevent code rearrangement */
  	volatile WalRcvData *walrcv = WalRcv;
  
  	/*
  	 * WalRcv should be set up already (if we are a backend, we inherit this
--- 171,180 ----
  {
  	char		conninfo[MAXCONNINFO];
  	XLogRecPtr	startpoint;
  	/* use volatile pointer to prevent code rearrangement */
  	volatile WalRcvData *walrcv = WalRcv;
+ 	TimestampTz last_recv_timestamp;
+ 	bool		ping_sent;
  
  	/*
  	 * WalRcv should be set up already (if we are a backend, we inherit this
***************
*** 282,287 **** WalReceiverMain(void)
--- 284,293 ----
  	MemSet(&reply_message, 0, sizeof(reply_message));
  	MemSet(&feedback_message, 0, sizeof(feedback_message));
  
+ 	/* Initialize the last recv timestamp */
+ 	last_recv_timestamp = GetCurrentTimestamp();
+ 	ping_sent = false;
+ 
  	/* Loop until end-of-streaming or error */
  	for (;;)
  	{
***************
*** 316,330 **** WalReceiverMain(void)
  		/* Wait a while for data to arrive */
  		if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
  		{
  			/* Accept the received data, and process it */
  			XLogWalRcvProcessMsg(type, buf, len);
  
  			/* Receive any more data we can without sleeping */
  			while (walrcv_receive(0, &type, &buf, &len))
  				XLogWalRcvProcessMsg(type, buf, len);
  
  			/* Let the master know that we received some data. */
! 			XLogWalRcvSendReply();
  
  			/*
  			 * If we've written some records, flush them to disk and let the
--- 322,344 ----
  		/* Wait a while for data to arrive */
  		if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
  		{
+ 			/* Something was received from master, so reset timeout */
+ 			last_recv_timestamp = GetCurrentTimestamp();
+ 			ping_sent = false;
+ 
  			/* Accept the received data, and process it */
  			XLogWalRcvProcessMsg(type, buf, len);
  
  			/* Receive any more data we can without sleeping */
  			while (walrcv_receive(0, &type, &buf, &len))
+ 			{
+ 				last_recv_timestamp = GetCurrentTimestamp();
+ 				ping_sent = false;
  				XLogWalRcvProcessMsg(type, buf, len);
+ 			}
  
  			/* Let the master know that we received some data. */
! 			XLogWalRcvSendReply(false, false);
  
  			/*
  			 * If we've written some records, flush them to disk and let the
***************
*** 335,344 **** WalReceiverMain(void)
  		else
  		{
  			/*
! 			 * We didn't receive anything new, but send a status update to the
! 			 * master anyway, to report any progress in applying WAL.
  			 */
! 			XLogWalRcvSendReply();
  			XLogWalRcvSendHSFeedback();
  		}
  	}
--- 349,396 ----
  		else
  		{
  			/*
! 			 * We didn't receive anything new. If we haven't heard anything
! 			 * from the server for more than wal_receiver_timeout / 2,
! 			 * ping the server. Also, if it's been longer than
! 			 * wal_receiver_status_interval since the last update we sent,
! 			 * send a status update to the master anyway, to report any
! 			 * progress in applying WAL.
! 			 */
! 			bool requestReply = false;
! 
! 			/*
! 			 * Check if time since last receive from standby has reached the
! 			 * configured limit.
  			 */
! 			if (wal_receiver_timeout > 0)
! 			{
! 				TimestampTz now = GetCurrentTimestamp();
! 				TimestampTz timeout;
! 
! 				timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
! 													  wal_receiver_timeout);
! 
! 				if (now >= timeout)
! 					ereport(ERROR,
! 							(errmsg("terminating walreceiver due to timeout")));
! 
! 				/*
! 				 * We didn't receive anything new, for half of receiver
! 				 * replication timeout. Ping the server.
! 				 */
! 				if (!ping_sent)
! 				{
! 					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
! 														  (wal_receiver_timeout/2));
! 					if (now >= timeout)
! 					{
! 						requestReply = true;
! 						ping_sent = true;
! 					}
! 				}
! 			}
! 
! 			XLogWalRcvSendReply(requestReply, requestReply);
  			XLogWalRcvSendHSFeedback();
  		}
  	}
***************
*** 460,465 **** XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
--- 512,521 ----
  				memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
  
  				ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+ 
+ 				/* If the primary requested a reply, send one immediately */
+ 				if (keepalive.replyRequested)
+ 					XLogWalRcvSendReply(true, false);
  				break;
  			}
  		default:
***************
*** 609,627 **** XLogWalRcvFlush(bool dying)
  
  		/* Also let the master know that we made some progress */
  		if (!dying)
! 		{
! 			XLogWalRcvSendReply();
! 			XLogWalRcvSendHSFeedback();
! 		}
  	}
  }
  
  /*
!  * Send reply message to primary, indicating our current XLOG positions and
!  * the current time.
   */
  static void
! XLogWalRcvSendReply(void)
  {
  	char		buf[sizeof(StandbyReplyMessage) + 1];
  	TimestampTz now;
--- 665,688 ----
  
  		/* Also let the master know that we made some progress */
  		if (!dying)
! 			XLogWalRcvSendReply(false, false);
  	}
  }
  
  /*
!  * Send reply message to primary, indicating our current XLOG positions, oldest
!  * xmin and the current time.
!  *
!  * If 'force' is not true, the message is not sent unless enough time has
!  * passed since last status update to reach wal_receiver_status_internal (or
!  * if wal_receiver_status_interval is disabled altogether).
!  *
!  * If 'requestReply' is true, requests the server to reply immediately upon
!  * receiving this message. This is used for heartbearts, when approaching
!  * wal_receiver_timeout.
   */
  static void
! XLogWalRcvSendReply(bool force, bool requestReply)
  {
  	char		buf[sizeof(StandbyReplyMessage) + 1];
  	TimestampTz now;
***************
*** 630,636 **** XLogWalRcvSendReply(void)
  	 * If the user doesn't want status to be reported to the master, be sure
  	 * to exit before doing anything at all.
  	 */
! 	if (wal_receiver_status_interval <= 0)
  		return;
  
  	/* Get current timestamp. */
--- 691,697 ----
  	 * If the user doesn't want status to be reported to the master, be sure
  	 * to exit before doing anything at all.
  	 */
! 	if (!force && wal_receiver_status_interval <= 0)
  		return;
  
  	/* Get current timestamp. */
***************
*** 645,651 **** XLogWalRcvSendReply(void)
  	 * this is only for reporting purposes and only on idle systems, that's
  	 * probably OK.
  	 */
! 	if (XLByteEQ(reply_message.write, LogstreamResult.Write)
  		&& XLByteEQ(reply_message.flush, LogstreamResult.Flush)
  		&& !TimestampDifferenceExceeds(reply_message.sendTime, now,
  									   wal_receiver_status_interval * 1000))
--- 706,713 ----
  	 * this is only for reporting purposes and only on idle systems, that's
  	 * probably OK.
  	 */
! 	if (!force
! 		&& XLByteEQ(reply_message.write, LogstreamResult.Write)
  		&& XLByteEQ(reply_message.flush, LogstreamResult.Flush)
  		&& !TimestampDifferenceExceeds(reply_message.sendTime, now,
  									   wal_receiver_status_interval * 1000))
***************
*** 656,661 **** XLogWalRcvSendReply(void)
--- 718,724 ----
  	reply_message.flush = LogstreamResult.Flush;
  	reply_message.apply = GetXLogReplayRecPtr(NULL);
  	reply_message.sendTime = now;
+ 	reply_message.replyRequested = requestReply;
  
  	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
  		 (uint32) (reply_message.write >> 32), (uint32) reply_message.write,
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 82,88 **** static bool	replication_started = false; /* Started streaming yet? */
  
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
! int			replication_timeout = 60 * 1000;	/* maximum time to send one
  												 * WAL data message */
  /*
   * State for WalSndWakeupRequest
--- 82,88 ----
  
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
! int			wal_sender_timeout = 60 * 1000;	/* maximum time to send one
  												 * WAL data message */
  /*
   * State for WalSndWakeupRequest
***************
*** 103,117 **** static uint32 sendOff = 0;
   */
  static XLogRecPtr sentPtr = 0;
  
  /*
!  * Buffer for processing reply messages.
   */
! static StringInfoData reply_message;
  
  /*
   * Timestamp of the last receipt of the reply from the standby.
   */
  static TimestampTz last_reply_timestamp;
  
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
--- 103,122 ----
   */
  static XLogRecPtr sentPtr = 0;
  
+ /* Buffer for processing reply messages. */
+ static StringInfoData reply_message;
  /*
!  * Buffer for constructing outgoing messages
!  * (1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE bytes)
   */
! static char *output_message;
  
  /*
   * Timestamp of the last receipt of the reply from the standby.
   */
  static TimestampTz last_reply_timestamp;
+ /* Have we sent a heartbeat message asking for reply, since last reply? */
+ static bool	ping_sent = false;
  
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
***************
*** 126,139 **** static void WalSndLastCycleHandler(SIGNAL_ARGS);
  static void WalSndLoop(void) __attribute__((noreturn));
  static void InitWalSenderSlot(void);
  static void WalSndKill(int code, Datum arg);
! static void XLogSend(char *msgbuf, bool *caughtup);
  static void IdentifySystem(void);
  static void StartReplication(StartReplicationCmd *cmd);
  static void ProcessStandbyMessage(void);
  static void ProcessStandbyReplyMessage(void);
  static void ProcessStandbyHSFeedbackMessage(void);
  static void ProcessRepliesIfAny(void);
! static void WalSndKeepalive(char *msgbuf);
  
  
  /* Initialize walsender process before entering the main command loop */
--- 131,144 ----
  static void WalSndLoop(void) __attribute__((noreturn));
  static void InitWalSenderSlot(void);
  static void WalSndKill(int code, Datum arg);
! static void XLogSend(bool *caughtup);
  static void IdentifySystem(void);
  static void StartReplication(StartReplicationCmd *cmd);
  static void ProcessStandbyMessage(void);
  static void ProcessStandbyReplyMessage(void);
  static void ProcessStandbyHSFeedbackMessage(void);
  static void ProcessRepliesIfAny(void);
! static void WalSndKeepalive(bool requestReply);
  
  
  /* Initialize walsender process before entering the main command loop */
***************
*** 465,471 **** ProcessRepliesIfAny(void)
--- 470,479 ----
  	 * Save the last reply timestamp if we've received at least one reply.
  	 */
  	if (received)
+ 	{
  		last_reply_timestamp = GetCurrentTimestamp();
+ 		ping_sent = false;
+ 	}
  }
  
  /*
***************
*** 527,532 **** ProcessStandbyReplyMessage(void)
--- 535,544 ----
  		 (uint32) (reply.flush >> 32), (uint32) reply.flush,
  		 (uint32) (reply.apply >> 32), (uint32) reply.apply);
  
+ 	/* Send a reply if the standby requested one. */
+ 	if (reply.replyRequested)
+ 		WalSndKeepalive(false);
+ 
  	/*
  	 * Update shared state for this WalSender process based on reply data from
  	 * standby.
***************
*** 620,626 **** ProcessStandbyHSFeedbackMessage(void)
  static void
  WalSndLoop(void)
  {
- 	char	   *output_message;
  	bool		caughtup = false;
  
  	/*
--- 632,637 ----
***************
*** 638,643 **** WalSndLoop(void)
--- 649,655 ----
  
  	/* Initialize the last reply timestamp */
  	last_reply_timestamp = GetCurrentTimestamp();
+ 	ping_sent = false;
  
  	/* Loop forever, unless we get an error */
  	for (;;)
***************
*** 672,678 **** WalSndLoop(void)
  		 * caught up.
  		 */
  		if (!pq_is_send_pending())
! 			XLogSend(output_message, &caughtup);
  		else
  			caughtup = false;
  
--- 684,690 ----
  		 * caught up.
  		 */
  		if (!pq_is_send_pending())
! 			XLogSend(&caughtup);
  		else
  			caughtup = false;
  
***************
*** 708,714 **** WalSndLoop(void)
  			if (walsender_ready_to_stop)
  			{
  				/* ... let's just be real sure we're caught up ... */
! 				XLogSend(output_message, &caughtup);
  				if (caughtup && !pq_is_send_pending())
  				{
  					/* Inform the standby that XLOG streaming is done */
--- 720,726 ----
  			if (walsender_ready_to_stop)
  			{
  				/* ... let's just be real sure we're caught up ... */
! 				XLogSend(&caughtup);
  				if (caughtup && !pq_is_send_pending())
  				{
  					/* Inform the standby that XLOG streaming is done */
***************
*** 738,760 **** WalSndLoop(void)
  
  			if (pq_is_send_pending())
  				wakeEvents |= WL_SOCKET_WRITEABLE;
! 			else if (MyWalSnd->sendKeepalive)
  			{
! 				WalSndKeepalive(output_message);
! 				/* Try to flush pending output to the client */
! 				if (pq_flush_if_writable() != 0)
! 					break;
  			}
  
  			/* Determine time until replication timeout */
! 			if (replication_timeout > 0)
  			{
  				timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
! 													  replication_timeout);
! 				sleeptime = 1 + (replication_timeout / 10);
  			}
  
! 			/* Sleep until something happens or replication timeout */
  			ImmediateInterruptOK = true;
  			CHECK_FOR_INTERRUPTS();
  			WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
--- 750,783 ----
  
  			if (pq_is_send_pending())
  				wakeEvents |= WL_SOCKET_WRITEABLE;
! 			else if (wal_sender_timeout > 0 && !ping_sent)
  			{
! 				/*
! 				 * If half of wal_sender_timeout has lapsed without receiving
! 				 * any reply from standby, send a keep-alive message to standby
! 				 * requesting an immediate reply.
! 				 */
! 				timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
! 													  wal_sender_timeout / 2);
! 				if (GetCurrentTimestamp() >= timeout)
! 				{
! 					WalSndKeepalive(true);
! 					ping_sent = true;
! 					/* Try to flush pending output to the client */
! 					if (pq_flush_if_writable() != 0)
! 						break;
! 				}
  			}
  
  			/* Determine time until replication timeout */
! 			if (wal_sender_timeout > 0)
  			{
  				timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
! 													  wal_sender_timeout);
! 				sleeptime = 1 + (wal_sender_timeout / 10);
  			}
  
! 			/* Sleep until something happens or we time out */
  			ImmediateInterruptOK = true;
  			CHECK_FOR_INTERRUPTS();
  			WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
***************
*** 766,773 **** WalSndLoop(void)
  			 * possibility that the client replied just as we reached the
  			 * timeout ... he's supposed to reply *before* that.
  			 */
! 			if (replication_timeout > 0 &&
! 				GetCurrentTimestamp() >= timeout)
  			{
  				/*
  				 * Since typically expiration of replication timeout means
--- 789,795 ----
  			 * possibility that the client replied just as we reached the
  			 * timeout ... he's supposed to reply *before* that.
  			 */
! 			if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
  			{
  				/*
  				 * Since typically expiration of replication timeout means
***************
*** 1016,1030 **** retry:
   * but not yet sent to the client, and buffer it in the libpq output
   * buffer.
   *
-  * msgbuf is a work area in which the output message is constructed.  It's
-  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
-  * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
-  *
   * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
   * *caughtup is set to false.
   */
  static void
! XLogSend(char *msgbuf, bool *caughtup)
  {
  	XLogRecPtr	SendRqstPtr;
  	XLogRecPtr	startptr;
--- 1038,1048 ----
   * but not yet sent to the client, and buffer it in the libpq output
   * buffer.
   *
   * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
   * *caughtup is set to false.
   */
  static void
! XLogSend(bool *caughtup)
  {
  	XLogRecPtr	SendRqstPtr;
  	XLogRecPtr	startptr;
***************
*** 1107,1119 **** XLogSend(char *msgbuf, bool *caughtup)
  	/*
  	 * OK to read and send the slice.
  	 */
! 	msgbuf[0] = 'w';
  
  	/*
  	 * Read the log directly into the output buffer to avoid extra memcpy
  	 * calls.
  	 */
! 	XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken as
--- 1125,1137 ----
  	/*
  	 * OK to read and send the slice.
  	 */
! 	output_message[0] = 'w';
  
  	/*
  	 * Read the log directly into the output buffer to avoid extra memcpy
  	 * calls.
  	 */
! 	XLogRead(output_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken as
***************
*** 1123,1131 **** XLogSend(char *msgbuf, bool *caughtup)
  	msghdr.walEnd = SendRqstPtr;
  	msghdr.sendTime = GetCurrentTimestamp();
  
! 	memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
  
! 	pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
  
  	sentPtr = endptr;
  
--- 1141,1149 ----
  	msghdr.walEnd = SendRqstPtr;
  	msghdr.sendTime = GetCurrentTimestamp();
  
! 	memcpy(output_message + 1, &msghdr, sizeof(WalDataMessageHeader));
  
! 	pq_putmessage_noblock('d', output_message, 1 + sizeof(WalDataMessageHeader) + nbytes);
  
  	sentPtr = endptr;
  
***************
*** 1492,1512 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	return (Datum) 0;
  }
  
  static void
! WalSndKeepalive(char *msgbuf)
  {
  	PrimaryKeepaliveMessage keepalive_message;
  
  	/* Construct a new message */
  	keepalive_message.walEnd = sentPtr;
  	keepalive_message.sendTime = GetCurrentTimestamp();
  
  	elog(DEBUG2, "sending replication keepalive");
  
  	/* Prepend with the message type and send it. */
! 	msgbuf[0] = 'k';
! 	memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
! 	pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
  }
  
  /*
--- 1510,1536 ----
  	return (Datum) 0;
  }
  
+ /*
+   * This function is used to send keepalive message to standby.
+   * If requestReply is set, sets a flag in the message requesting the standby
+   * to send a message back to us, for heartbeat purposes.
+   */
  static void
! WalSndKeepalive(bool requestReply)
  {
  	PrimaryKeepaliveMessage keepalive_message;
  
  	/* Construct a new message */
  	keepalive_message.walEnd = sentPtr;
  	keepalive_message.sendTime = GetCurrentTimestamp();
+ 	keepalive_message.replyRequested = requestReply;
  
  	elog(DEBUG2, "sending replication keepalive");
  
  	/* Prepend with the message type and send it. */
! 	output_message[0] = 'k';
! 	memcpy(output_message + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
! 	pq_putmessage_noblock('d', output_message, sizeof(PrimaryKeepaliveMessage) + 1);
  }
  
  /*
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1596,1601 **** static struct config_int ConfigureNamesInt[] =
--- 1596,1612 ----
  	},
  
  	{
+ 		{"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY,
+ 			gettext_noop("Sets the maximum wait time to receive data from master."),
+ 			NULL,
+ 			GUC_UNIT_MS
+ 		},
+ 		&wal_receiver_timeout,
+ 		60 * 1000, 0, INT_MAX,
+ 		NULL, NULL, NULL
+ 	},
+ 
+ 	{
  		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
  			gettext_noop("Sets the maximum number of concurrent connections."),
  			NULL
***************
*** 2019,2030 **** static struct config_int ConfigureNamesInt[] =
  	},
  
  	{
! 		{"replication_timeout", PGC_SIGHUP, REPLICATION_SENDING,
  			gettext_noop("Sets the maximum time to wait for WAL replication."),
  			NULL,
  			GUC_UNIT_MS
  		},
! 		&replication_timeout,
  		60 * 1000, 0, INT_MAX,
  		NULL, NULL, NULL
  	},
--- 2030,2041 ----
  	},
  
  	{
! 		{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
  			gettext_noop("Sets the maximum time to wait for WAL replication."),
  			NULL,
  			GUC_UNIT_MS
  		},
! 		&wal_sender_timeout,
  		60 * 1000, 0, INT_MAX,
  		NULL, NULL, NULL
  	},
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 210,216 ****
  #max_wal_senders = 0		# max number of walsender processes
  				# (change requires restart)
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
! #replication_timeout = 60s	# in milliseconds; 0 disables
  
  # - Master Server -
  
--- 210,216 ----
  #max_wal_senders = 0		# max number of walsender processes
  				# (change requires restart)
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
! #wal_sender_timeout = 60s	# in milliseconds; 0 disables
  
  # - Master Server -
  
***************
*** 237,242 ****
--- 237,245 ----
  					# 0 disables
  #hot_standby_feedback = off		# send info from standby to prevent
  					# query conflicts
+ #wal_receiver_timeout = 60s		# time that receiver waits for
+ 					# communication from master
+ 					# in milliseconds; 0 disables
  
  
  #------------------------------------------------------------------------------
*** a/src/include/replication/walprotocol.h
--- b/src/include/replication/walprotocol.h
***************
*** 27,32 **** typedef struct
--- 27,38 ----
  
  	/* Sender's system clock at the time of transmission */
  	TimestampTz sendTime;
+ 
+ 	/*
+ 	 * If replyRequested is set, the client should reply immediately to this
+ 	 * message, to avoid a timeout disconnect.
+ 	 */
+ 	bool		replyRequested;
  } WalSndrMessage;
  
  
***************
*** 80,85 **** typedef struct
--- 86,97 ----
  
  	/* Sender's system clock at the time of transmission */
  	TimestampTz sendTime;
+ 
+ 	/*
+ 	 * If replyRequested is set, the server should reply immediately to this
+ 	 * message, to avoid a timeout disconnect.
+ 	 */
+ 	bool		replyRequested;
  } StandbyReplyMessage;
  
  /*
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 17,23 ****
--- 17,25 ----
  #include "storage/spin.h"
  #include "pgtime.h"
  
+ /* user-settable parameters */
  extern int	wal_receiver_status_interval;
+ extern int	wal_receiver_timeout;
  extern bool hot_standby_feedback;
  
  /*
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 24,30 **** extern bool wake_wal_senders;
  
  /* user-settable parameters */
  extern int	max_wal_senders;
! extern int	replication_timeout;
  
  extern void InitWalSender(void);
  extern void exec_replication_command(const char *query_string);
--- 24,30 ----
  
  /* user-settable parameters */
  extern int	max_wal_senders;
! extern int	wal_sender_timeout;
  
  extern void InitWalSender(void);
  extern void exec_replication_command(const char *query_string);
*** a/src/include/replication/walsender_private.h
--- b/src/include/replication/walsender_private.h
***************
*** 37,43 **** typedef struct WalSnd
  	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
  	bool		needreload;		/* does currently-open file need to be
  								 * reloaded? */
- 	bool		sendKeepalive;	/* do we send keepalives on this connection? */
  
  	/*
  	 * The xlog locations that have been written, flushed, and applied by
--- 37,42 ----
-- 
Sent via pgsql-bugs mailing list (pgsql-bugs@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-bugs

Reply via email to