Heikki Linnakangas wrote:
> Simon Riggs wrote:
>> WALSender sleeps even when it might have more WAL to send, it doesn't
>> check it just unconditionally sleeps. At least WALReceiver loops until
>> it has no more to receive. I just can't imagine why that's useful
>> behaviour.
> 
> Good catch. That should be fixed.
> 
> I also note that walsender doesn't respond to signals, while it's
> sending a large batch. That's analogous to the issue that was addressed
> recently in the archiver process.

Attached patch rearranges the walsender loops slightly to fix the above.
XLogSend() now only sends up to MAX_SEND_SIZE bytes (== XLOG_SEG_SIZE /
2) in one round and returns to the main loop after that even if there's
unsent WAL, and the main loop no longer sleeps if there's unsent WAL.
That way the main loop gets to respond to signals quickly, and we also
get to update the shared memory status and PS display more often when
there's a lot of catching up to do.

Comments, have I screwed up anything?

-- 
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 100,106 **** static void InitWalSnd(void);
  static void WalSndHandshake(void);
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
! static bool XLogSend(StringInfo outMsg);
  static void CheckClosedConnection(void);
  
  /*
--- 100,106 ----
  static void WalSndHandshake(void);
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
! static bool XLogSend(StringInfo outMsg, bool *caughtup);
  static void CheckClosedConnection(void);
  
  /*
***************
*** 360,365 **** static int
--- 360,366 ----
  WalSndLoop(void)
  {
  	StringInfoData output_message;
+ 	bool		caughtup = false;
  
  	initStringInfo(&output_message);
  
***************
*** 387,393 **** WalSndLoop(void)
  		 */
  		if (ready_to_stop)
  		{
! 			XLogSend(&output_message);
  			shutdown_requested = true;
  		}
  
--- 388,394 ----
  		 */
  		if (ready_to_stop)
  		{
! 			XLogSend(&output_message, &caughtup);
  			shutdown_requested = true;
  		}
  
***************
*** 402,432 **** WalSndLoop(void)
  		}
  
  		/*
! 		 * Nap for the configured time or until a message arrives.
  		 *
  		 * On some platforms, signals won't interrupt the sleep.  To ensure we
  		 * respond reasonably promptly when someone signals us, break down the
  		 * sleep into NAPTIME_PER_CYCLE increments, and check for
  		 * interrupts after each nap.
  		 */
! 		remain = WalSndDelay * 1000L;
! 		while (remain > 0)
  		{
! 			if (got_SIGHUP || shutdown_requested || ready_to_stop)
! 				break;
  
! 			/*
! 			 * Check to see whether a message from the standby or an interrupt
! 			 * from other processes has arrived.
! 			 */
! 			pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! 			CheckClosedConnection();
  
! 			remain -= NAPTIME_PER_CYCLE;
  		}
- 
  		/* Attempt to send the log once every loop */
! 		if (!XLogSend(&output_message))
  			goto eof;
  	}
  
--- 403,434 ----
  		}
  
  		/*
! 		 * If we had sent all accumulated WAL in last round, nap for the
! 		 * configured time before retrying.
  		 *
  		 * On some platforms, signals won't interrupt the sleep.  To ensure we
  		 * respond reasonably promptly when someone signals us, break down the
  		 * sleep into NAPTIME_PER_CYCLE increments, and check for
  		 * interrupts after each nap.
  		 */
! 		if (caughtup)
  		{
! 			remain = WalSndDelay * 1000L;
! 			while (remain > 0)
! 			{
! 				/* Check for interrupts */
! 				if (got_SIGHUP || shutdown_requested || ready_to_stop)
! 					break;
  
! 				/* Sleep and check that the connection is still alive */
! 				pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! 				CheckClosedConnection();
  
! 				remain -= NAPTIME_PER_CYCLE;
! 			}
  		}
  		/* Attempt to send the log once every loop */
! 		if (!XLogSend(&output_message, &caughtup))
  			goto eof;
  	}
  
***************
*** 623,637 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  }
  
  /*
!  * Read all WAL that's been written (and flushed) since last cycle, and send
!  * it to client.
   *
   * Returns true if OK, false if trouble.
   */
  static bool
! XLogSend(StringInfo outMsg)
  {
  	XLogRecPtr	SendRqstPtr;
  	char		activitymsg[50];
  
  	/* use volatile pointer to prevent code rearrangement */
--- 625,644 ----
  }
  
  /*
!  * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
!  * but not yet sent to the client, and send it. If there is no unsent WAL,
!  * *caughtup is set to true and nothing is sent, otherwise *caughtup is set
!  * to false.
   *
   * Returns true if OK, false if trouble.
   */
  static bool
! XLogSend(StringInfo outMsg, bool *caughtup)
  {
  	XLogRecPtr	SendRqstPtr;
+ 	XLogRecPtr	startptr;
+ 	XLogRecPtr	endptr;
+ 	Size		nbytes;
  	char		activitymsg[50];
  
  	/* use volatile pointer to prevent code rearrangement */
***************
*** 642,725 **** XLogSend(StringInfo outMsg)
  
  	/* Quick exit if nothing to do */
  	if (!XLByteLT(sentPtr, SendRqstPtr))
  		return true;
  
  	/*
! 	 * We gather multiple records together by issuing just one XLogRead() of a
! 	 * suitable size, and send them as one CopyData message. Repeat until
! 	 * we've sent everything we can.
  	 */
! 	while (XLByteLT(sentPtr, SendRqstPtr))
  	{
- 		XLogRecPtr	startptr;
- 		XLogRecPtr	endptr;
- 		Size		nbytes;
- 
  		/*
! 		 * Figure out how much to send in one message. If there's less than
! 		 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
! 		 * MAX_SEND_SIZE bytes, but round to page boundary.
! 		 *
! 		 * The rounding is not only for performance reasons. Walreceiver
! 		 * relies on the fact that we never split a WAL record across two
! 		 * messages. Since a long WAL record is split at page boundary into
! 		 * continuation records, page boundary is always a safe cut-off point.
! 		 * We also assume that SendRqstPtr never points in the middle of a WAL
! 		 * record.
  		 */
! 		startptr = sentPtr;
! 		if (startptr.xrecoff >= XLogFileSize)
! 		{
! 			/*
! 			 * crossing a logid boundary, skip the non-existent last log
! 			 * segment in previous logical log file.
! 			 */
! 			startptr.xlogid += 1;
! 			startptr.xrecoff = 0;
! 		}
  
! 		endptr = startptr;
! 		XLByteAdvance(endptr, MAX_SEND_SIZE);
! 		/* round down to page boundary. */
! 		endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
! 		/* if we went beyond SendRqstPtr, back off */
! 		if (XLByteLT(SendRqstPtr, endptr))
! 			endptr = SendRqstPtr;
  
! 		/*
! 		 * OK to read and send the slice.
! 		 *
! 		 * We don't need to convert the xlogid/xrecoff from host byte order to
! 		 * network byte order because the both server can be expected to have
! 		 * the same byte order. If they have different byte order, we don't
! 		 * reach here.
! 		 */
! 		pq_sendbyte(outMsg, 'w');
! 		pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
  
! 		if (endptr.xlogid != startptr.xlogid)
! 		{
! 			Assert(endptr.xlogid == startptr.xlogid + 1);
! 			nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
! 		}
! 		else
! 			nbytes = endptr.xrecoff - startptr.xrecoff;
  
! 		sentPtr = endptr;
  
! 		/*
! 		 * Read the log directly into the output buffer to prevent extra
! 		 * memcpy calls.
! 		 */
! 		enlargeStringInfo(outMsg, nbytes);
  
! 		XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
! 		outMsg->len += nbytes;
! 		outMsg->data[outMsg->len] = '\0';
  
! 		pq_putmessage('d', outMsg->data, outMsg->len);
! 		resetStringInfo(outMsg);
! 	}
  
  	/* Update shared memory status */
  	SpinLockAcquire(&walsnd->mutex);
--- 649,730 ----
  
  	/* Quick exit if nothing to do */
  	if (!XLByteLT(sentPtr, SendRqstPtr))
+ 	{
+ 		*caughtup = true;
  		return true;
+ 	}
+ 	/*
+ 	 * Otherwise let the caller know that we're not fully caught up. Unless
+ 	 * there's a huge backlog, we'll be caught up to the current WriteRecPtr
+ 	 * after we've sent everything below, but more WAL could accumulate while
+ 	 * we're busy sending.
+ 	 */
+ 	*caughtup = false;
  
  	/*
! 	 * Figure out how much to send in one message. If there's less than
! 	 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
! 	 * MAX_SEND_SIZE bytes, but round to page boundary.
! 	 *
! 	 * The rounding is not only for performance reasons. Walreceiver
! 	 * relies on the fact that we never split a WAL record across two
! 	 * messages. Since a long WAL record is split at page boundary into
! 	 * continuation records, page boundary is always a safe cut-off point.
! 	 * We also assume that SendRqstPtr never points in the middle of a WAL
! 	 * record.
  	 */
! 	startptr = sentPtr;
! 	if (startptr.xrecoff >= XLogFileSize)
  	{
  		/*
! 		 * crossing a logid boundary, skip the non-existent last log
! 		 * segment in previous logical log file.
  		 */
! 		startptr.xlogid += 1;
! 		startptr.xrecoff = 0;
! 	}
  
! 	endptr = startptr;
! 	XLByteAdvance(endptr, MAX_SEND_SIZE);
! 	/* round down to page boundary. */
! 	endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
! 	/* if we went beyond SendRqstPtr, back off */
! 	if (XLByteLT(SendRqstPtr, endptr))
! 		endptr = SendRqstPtr;
  
! 	/*
! 	 * OK to read and send the slice.
! 	 *
! 	 * We don't need to convert the xlogid/xrecoff from host byte order to
! 	 * network byte order because the both server can be expected to have
! 	 * the same byte order. If they have different byte order, we don't
! 	 * reach here.
! 	 */
! 	pq_sendbyte(outMsg, 'w');
! 	pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
  
! 	if (endptr.xlogid != startptr.xlogid)
! 	{
! 		Assert(endptr.xlogid == startptr.xlogid + 1);
! 		nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
! 	}
! 	else
! 		nbytes = endptr.xrecoff - startptr.xrecoff;
  
! 	sentPtr = endptr;
  
! 	/*
! 	 * Read the log directly into the output buffer to prevent extra
! 	 * memcpy calls.
! 	 */
! 	enlargeStringInfo(outMsg, nbytes);
  
! 	XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
! 	outMsg->len += nbytes;
! 	outMsg->data[outMsg->len] = '\0';
  
! 	pq_putmessage('d', outMsg->data, outMsg->len);
! 	resetStringInfo(outMsg);
  
  	/* Update shared memory status */
  	SpinLockAcquire(&walsnd->mutex);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to