*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 367,372 **** typedef struct XLogCtlData
--- 367,374 ----
  	XLogRecPtr	asyncCommitLSN; /* LSN of newest async commit */
  	uint32		lastRemovedLog;	/* latest removed/recycled XLOG segment */
  	uint32		lastRemovedSeg;
+ 	XLogRecPtr	oldestCacheLSN;	/* start of oldest block in cache */
+ 	int			sendidx;		/* cache index of next block to send */
  
  	/* Protected by WALWriteLock: */
  	XLogCtlWrite Write;
***************
*** 550,555 **** static void XLogReportParameters(void);
--- 552,559 ----
  static void LocalSetXLogInsertAllowed(void);
  static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
  
+ static bool XLByteInBuf(XLogRecPtr recptr);
+ 
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
  static bool AdvanceXLInsertBuffer(bool new_segment);
***************
*** 956,961 **** begin:;
--- 960,971 ----
  			RecPtr.xrecoff = XLogFileSize;
  		}
  
+ 		/*
+ 		 * Report the end of the prior segment, so that the walsenders know
+ 		 * to send WAL up to that
+ 		 */
+ 		SetXLogSendRqst(RecPtr);
+ 
  		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  		LogwrtResult = XLogCtl->Write.LogwrtResult;
  		if (!XLByteLE(RecPtr, LogwrtResult.Flush))
***************
*** 1083,1093 **** begin:;
  
  		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  
  		/*
  		 * Flush through the end of the page containing XLOG_SWITCH, and
  		 * perform end-of-segment actions (eg, notifying archiver).
  		 */
- 		WriteRqst = XLogCtl->xlblocks[curridx];
  		FlushRqst.Write = WriteRqst;
  		FlushRqst.Flush = WriteRqst;
  		XLogWrite(FlushRqst, false, true);
--- 1093,1110 ----
  
  		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  
+ 		WriteRqst = XLogCtl->xlblocks[curridx];
+ 
+ 		/*
+ 		 * Report the end of the page containing XLOG_SWITCH, so that the
+ 		 * walsenders know to send WAL up to that
+ 		 */
+ 		SetXLogSendRqst(WriteRqst);
+ 
  		/*
  		 * Flush through the end of the page containing XLOG_SWITCH, and
  		 * perform end-of-segment actions (eg, notifying archiver).
  		 */
  		FlushRqst.Write = WriteRqst;
  		FlushRqst.Flush = WriteRqst;
  		XLogWrite(FlushRqst, false, true);
***************
*** 1156,1161 **** begin:;
--- 1173,1185 ----
  
  	LWLockRelease(WALInsertLock);
  
+ 	/*
+ 	 * Report the current WAL insert location, so that the walsenders
+ 	 * know to send WAL up to that.
+ 	 */
+ 	if (!isLogSwitch)
+ 		SetXLogSendRqst(RecPtr);
+ 
  	if (updrqst)
  	{
  		/* use volatile pointer to prevent code rearrangement */
***************
*** 1239,1244 **** XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
--- 1263,1413 ----
  }
  
  /*
+  * Read bytes in WAL buffers into 'buf', up to 'endptr', starting at 'recptr'.
+  *
+  * If at least one byte was read, sets *sendidx to the cache index of next
+  * block to read, sets *ReadResult to the byte position we've already read,
+  * and returns the number of the read bytes. If nothing was read, returns zero.
+  *
+  * If we could read the log up to 'endptr', we can expect that the next block
+  * to send is available in WAL buffers, so sets *frombuf to true. Otherwise,
+  * false.
+  */
+ Size
+ XLogBufRead(char *buf, XLogRecPtr recptr, XLogRecPtr endptr, int *sendidx,
+ 			XLogRecPtr *ReadResult, bool *frombuf)
+ {
+ 	bool		last_iteration;
+ 	char	   *from;
+ 	Size		nbytes;
+ 	Size		readbytes = 0;
+ 	uint32		startpos;
+ 
+ 	Assert(frombuf);
+ 	Assert(sendidx != -1);
+ 
+ 	/* Calculate the starting position of reading in the first page */
+ 	startpos = recptr.xrecoff % XLOG_BLCKSZ;
+ 	from = XLogCtl->pages + *sendidx * (Size) XLOG_BLCKSZ + startpos;
+ 	nbytes = (Size) XLOG_BLCKSZ - startpos;
+ 
+ 	/*
+ 	 * Within the loop, read one block at a time. Instead, like XLogWrite(),
+ 	 * we can gather multiple blocks together and issue just one memcpy() call.
+ 	 * But since we don't hold WALInsertLock here to avoid lock contention,
+ 	 * some earlier blocks might be purged from the cache during being gathered.
+ 	 * If this happens, we must start reading the log from the disk over again,
+ 	 * which would degrade the performance of walsender.
+ 	 *
+ 	 * To avoid that redo, we read the block available in the cache as soon as
+ 	 * possible.
+ 	 */
+ 	while (XLByteLT(recptr, endptr))
+ 	{
+ 		XLogRecPtr		prevptr;
+ 
+ 		/*
+ 		 * Advance recptr to end of current block. We read XLogCtl->xlblocks
+ 		 * without holding neither WALInsertLock nor WALWriteLock to avoid lock
+ 		 * contention. So it might be changed because of buffer replacement
+ 		 * while being read. We check whether current block is in the cache,
+ 		 * and if not, we give up advancing recptr and reading the log anymore.
+ 		 *
+ 		 * prevptr indicates the end + 1 of previous block, i.e., the start of
+ 		 * current one. The check is done by comparing prevptr with the start
+ 		 * of oldest block in the cache.
+ 		 */
+ 		prevptr = recptr;
+ 		recptr = XLogCtl->xlblocks[*sendidx];
+ 		if (!XLByteInBuf(prevptr))
+ 		{
+ 			if (readbytes > 0)
+ 				*ReadResult = prevptr;
+ 
+ 			/*
+ 			 * Since current block is not in the cache, we must read it from
+ 			 * the disk
+ 			 */
+ 			*frombuf = false;
+ 			break;
+ 		}
+ 
+ 		/*
+ 		 * If XLOG_SWITCH is in the previous block, we send all the read data,
+ 		 * and then forcibly advance to the start of the next segment
+ 		 */
+ 		if ((recptr.xrecoff - XLOG_BLCKSZ) % XLogSegSize == 0 &&
+ 			prevptr.xrecoff % XLogSegSize != 0)
+ 		{
+ 			recptr.xrecoff -= XLOG_BLCKSZ;
+ 			if (recptr.xrecoff == 0)
+ 			{
+ 				recptr.xlogid -= 1;
+ 				recptr.xrecoff = XLogSegSize;
+ 			}
+ 			*ReadResult = recptr;
+ 			break;
+ 		}
+ 
+ 		/* Is this the last loop iteration? */
+ 		last_iteration = XLByteLE(endptr, recptr);
+ 		if (last_iteration)
+ 		{
+ 			uint32		endpos;
+ 
+ 			if ((endpos = endptr.xrecoff % XLOG_BLCKSZ) != 0)
+ 				nbytes += endpos - (Size) XLOG_BLCKSZ;
+ 		}
+ 
+ 		/*
+ 		 * Attempt to read WAL buffers without holding any locks. So since the
+ 		 * current block might be replaced during being read, we need to check
+ 		 * whether it's still in the cache later.
+ 		 */
+ 		Assert(nbytes <= XLOG_BLCKSZ);
+ 		memcpy(buf, from, nbytes);
+ 		if (!XLByteInBuf(prevptr))
+ 		{
+ 			if (readbytes > 0)
+ 				*ReadResult = prevptr;
+ 
+ 			/*
+ 			 * Since current block is not in the cache, we must read it from
+ 			 * the disk
+ 			 */
+ 			*frombuf = false;
+ 			break;
+ 		}
+ 		readbytes += nbytes;
+ 
+ 		if (last_iteration)
+ 		{
+ 			/* If we went beyond endptr, back off */
+ 			if (XLByteLT(endptr, recptr))
+ 				recptr = endptr;
+ 			else
+ 				*sendidx = NextBufIdx(*sendidx);
+ 			*ReadResult = recptr;
+ 
+ 			/*
+ 			 * We can expect that the next block to read is in WAL buffers
+ 			 * since we could read all the data up to endptr from there
+ 			 */
+ 			*frombuf = true;
+ 			break;
+ 		}
+ 
+ 		/* Update states for next read */
+ 		*sendidx = NextBufIdx(*sendidx);
+ 		from = XLogCtl->pages + *sendidx * (Size) XLOG_BLCKSZ;
+ 		buf += nbytes;
+ 		nbytes = (Size) XLOG_BLCKSZ;
+ 	}
+ 
+ 	return readbytes;
+ }
+ 
+ /*
   * XLogArchiveNotify
   *
   * Create an archive notification file
***************
*** 1493,1498 **** AdvanceXLInsertBuffer(bool new_segment)
--- 1662,1681 ----
  	}
  
  	/*
+ 	 * Update the starting location of the oldest block in WAL buffers
+ 	 * if streaming replication is enabled
+ 	 */
+ 	if (max_wal_senders > 0)
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile XLogCtlData *xlogctl = XLogCtl;
+ 
+ 		SpinLockAcquire(&xlogctl->info_lck);
+ 		xlogctl->oldestCacheLSN = XLogCtl->xlblocks[nextidx];
+ 		SpinLockRelease(&xlogctl->info_lck);
+ 	}
+ 
+ 	/*
  	 * Now the next buffer slot is free and we can set it up to be the next
  	 * output page.
  	 */
***************
*** 1845,1850 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
--- 2028,2037 ----
  	 * We make sure that the shared 'request' values do not fall behind the
  	 * 'result' values.  This is not absolutely essential, but it saves some
  	 * code in a couple of places.
+ 	 *
+ 	 * Set XLogCtl->sendidx to the cache index of next block to write. This
+ 	 * can be used as the index of next block to send after sending all WAL
+ 	 * written to the disk before now.
  	 */
  	{
  		/* use volatile pointer to prevent code rearrangement */
***************
*** 1852,1857 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
--- 2039,2045 ----
  
  		SpinLockAcquire(&xlogctl->info_lck);
  		xlogctl->LogwrtResult = LogwrtResult;
+ 		xlogctl->sendidx = Write->curridx;
  		if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
  			xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
  		if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
***************
*** 6313,6318 **** StartupXLOG(void)
--- 6501,6513 ----
  		((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
  
  	/*
+ 	 * We don't need to calculate the accurate starting location of oldest
+ 	 * block in the cache since we will never attempt to read the older data
+ 	 * than EndOfLog.
+ 	 */
+ 	XLogCtl->oldestCacheLSN = EndOfLog;
+ 
+ 	/*
  	 * Tricky point here: readBuf contains the *last* block that the LastRec
  	 * record spans, not the one it starts in.	The last block is indeed the
  	 * one we want to use.
***************
*** 6803,6822 **** GetInsertRecPtr(void)
  }
  
  /*
!  * GetWriteRecPtr -- Returns the current write position.
   */
! XLogRecPtr
! GetWriteRecPtr(void)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
- 	XLogRecPtr	recptr;
  
  	SpinLockAcquire(&xlogctl->info_lck);
! 	recptr = xlogctl->LogwrtResult.Write;
  	SpinLockRelease(&xlogctl->info_lck);
  
! 	return recptr;
  }
  
  /*
--- 6998,7030 ----
  }
  
  /*
!  * GetWriteRecPtrAndIndex -- Gets the current write position and associated index.
   */
! void
! GetWriteRecPtrAndIndex(XLogRecPtr *recptr, int *index)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
  
  	SpinLockAcquire(&xlogctl->info_lck);
! 	*recptr = xlogctl->LogwrtResult.Write;
! 	*index = xlogctl->sendidx;
  	SpinLockRelease(&xlogctl->info_lck);
+ }
  
! /* XLByteInBuf -- Is the LSN within WAL buffers? */
! static bool
! XLByteInBuf(XLogRecPtr recptr)
! {
! 	/* use volatile pointer to prevent code rearrangement */
! 	volatile XLogCtlData *xlogctl = XLogCtl;
! 	XLogRecPtr oldestCacheLSN;
! 
! 	SpinLockAcquire(&xlogctl->info_lck);
! 	oldestCacheLSN = xlogctl->oldestCacheLSN;
! 	SpinLockRelease(&xlogctl->info_lck);
! 
! 	return XLByteLE(oldestCacheLSN, recptr);
  }
  
  /*
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 529,537 **** WalSndKill(int code, Datum arg)
  
  /*
   * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
-  *
-  * XXX probably this should be improved to suck data directly from the
-  * WAL buffers when possible.
   */
  static void
  XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
--- 529,534 ----
***************
*** 641,648 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  }
  
  /*
!  * Read up to MAX_SEND_SIZE bytes of WAL that's been written to disk,
!  * but not yet sent to the client, and send it.
   *
   * msgbuf is a work area in which the output message is constructed.  It's
   * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
--- 638,645 ----
  }
  
  /*
!  * Read up to MAX_SEND_SIZE bytes of WAL that's been inserted to WAL buffers
!  * or written to disk, but not yet sent to the client, and send it.
   *
   * msgbuf is a work area in which the output message is constructed.  It's
   * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
***************
*** 661,677 **** XLogSend(char *msgbuf, bool *caughtup)
  	XLogRecPtr	endptr;
  	Size		nbytes;
  	WalDataMessageHeader msghdr;
  
  	/*
! 	 * Attempt to send all data that's already been written out from WAL
! 	 * buffers (note it might not yet be fsync'd to disk).  We cannot go
! 	 * further than that given the current implementation of XLogRead().
  	 */
! 	SendRqstPtr = GetWriteRecPtr();
  
  	/* Quick exit if nothing to do */
  	if (XLByteLE(SendRqstPtr, sentPtr))
  	{
  		*caughtup = true;
  		return true;
  	}
--- 658,701 ----
  	XLogRecPtr	endptr;
  	Size		nbytes;
  	WalDataMessageHeader msghdr;
+ 	static int	sendidx = -1;
+ 	static bool	frombuf = false;
  
+ retry:
  	/*
! 	 * Attempt to send all data that's already been inserted into or written
! 	 * out from WAL buffers (note it might not yet be fsync'd to disk)
  	 */
! 	if (frombuf)
! 	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		WalSndCtlData *walsndctl = WalSndCtl;
! 
! 		SpinLockAcquire(&walsndctl->info_lck);
! 		SendRqstPtr = walsndctl->sendRqst;
! 		SpinLockRelease(&walsndctl->info_lck);
! 	}
! 	else
! 		/*
! 		 * XXX: if we've gotten within XLOG_BLCKSZ bytes of the current WAL
! 		 * write location, we should attempt to read data from WAL buffers
! 		 * instead of the disk?
! 		 */
! 		GetWriteRecPtrAndIndex(&SendRqstPtr, &sendidx);
  
  	/* Quick exit if nothing to do */
  	if (XLByteLE(SendRqstPtr, sentPtr))
  	{
+ 		/*
+ 		 * Attempt to read data from WAL buffers and send it, if we've
+ 		 * already sent all WAL written to the disk
+ 		 */
+ 		if (!frombuf)
+ 		{
+ 			frombuf = true;
+ 			goto retry;
+ 		}
+ 
  		*caughtup = true;
  		return true;
  	}
***************
*** 722,740 **** XLogSend(char *msgbuf, bool *caughtup)
  		*caughtup = false;
  	}
  
- 	nbytes = endptr.xrecoff - startptr.xrecoff;
- 	Assert(nbytes <= MAX_SEND_SIZE);
- 
  	/*
  	 * OK to read and send the slice.
  	 */
  	msgbuf[0] = 'w';
  
! 	/*
! 	 * Read the log directly into the output buffer to avoid extra memcpy
! 	 * calls.
! 	 */
! 	XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken
--- 746,798 ----
  		*caughtup = false;
  	}
  
  	/*
  	 * OK to read and send the slice.
  	 */
  	msgbuf[0] = 'w';
  
! 	if (frombuf)
! 	{
! 		/*
! 		 * Read the log from WAL buffers, up to endptr, starting at startptr.
! 		 * If no log could be read, we immediately retry to read it from the
! 		 * disk.
! 		 */
! 		if ((nbytes = XLogBufRead(msgbuf + 1 + sizeof(WalDataMessageHeader),
! 								  startptr, endptr, &sendidx, &sentPtr,
! 								  &frombuf)) <= 0)
! 			goto retry;
! 
! 		/*
! 		 * If we could not reach endptr though at least one byte was read,
! 		 * we retry to read the log from the disk after sending the read data.
! 		 */
! 		if (!frombuf)
! 			*caughtup = false;
! 	}
! 	else
! 	{
! 		nbytes = endptr.xrecoff - startptr.xrecoff;
! 		Assert(nbytes <= MAX_SEND_SIZE);
! 
! 		/*
! 		 * Read the log directly into the output buffer to avoid extra memcpy
! 		 * calls.
! 		 */
! 		XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
! 
! 		sentPtr = endptr;
! 
! 		/*
! 		 * Attempt to read data from WAL buffers and send it, if we've already
! 		 * sent all WAL written to the disk
! 		 */
! 		if (*caughtup)
! 		{
! 			frombuf = true;
! 			*caughtup = false;
! 		}
! 	}
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken
***************
*** 752,759 **** XLogSend(char *msgbuf, bool *caughtup)
  	if (pq_flush())
  		return false;
  
- 	sentPtr = endptr;
- 
  	/* Update shared memory status */
  	{
  		/* use volatile pointer to prevent code rearrangement */
--- 810,815 ----
***************
*** 880,885 **** WalSndShmemInit(void)
--- 936,942 ----
  	{
  		/* First time through, so initialize */
  		MemSet(WalSndCtl, 0, WalSndShmemSize());
+ 		SpinLockInit(&WalSndCtl->info_lck);
  
  		for (i = 0; i < max_wal_senders; i++)
  		{
***************
*** 890,895 **** WalSndShmemInit(void)
--- 947,974 ----
  	}
  }
  
+ /* Record the LSN for walsenders to send WAL up to that */
+ void
+ SetXLogSendRqst(XLogRecPtr recptr)
+ {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	WalSndCtlData *walsndctl = WalSndCtl;
+ 
+ 	/*
+ 	 * Do nothing if streaming replication is disabled.
+ 	 *
+ 	 * XXX: even if it's enabled, should we skip recording if
+ 	 * there is no walsender in progress?
+ 	 */
+ 	if (max_wal_senders == 0)
+ 		return;
+ 
+ 	SpinLockAcquire(&walsndctl->info_lck);
+ 	if (XLByteLT(walsndctl->sendRqst, recptr))
+ 		walsndctl->sendRqst = recptr;
+ 	SpinLockRelease(&walsndctl->info_lck);
+ }
+ 
  /*
   * This isn't currently used for anything. Monitoring tools might be
   * interested in the future, and we'll need something like this in the
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 266,271 **** extern int XLogFileInit(uint32 log, uint32 seg,
--- 266,273 ----
  			 bool *use_existent, bool use_lock);
  extern int	XLogFileOpen(uint32 log, uint32 seg);
  
+ extern Size XLogBufRead(char *buf, XLogRecPtr recptr, XLogRecPtr endptr,
+ 						int *sendidx, XLogRecPtr *ReadResult, bool *frombuf);
  
  extern void XLogGetLastRemoved(uint32 *log, uint32 *seg);
  extern void XLogSetAsyncCommitLSN(XLogRecPtr record);
***************
*** 294,300 **** extern bool CreateRestartPoint(int flags);
  extern void XLogPutNextOid(Oid nextOid);
  extern XLogRecPtr GetRedoRecPtr(void);
  extern XLogRecPtr GetInsertRecPtr(void);
! extern XLogRecPtr GetWriteRecPtr(void);
  extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
  extern TimeLineID GetRecoveryTargetTLI(void);
  
--- 296,302 ----
  extern void XLogPutNextOid(Oid nextOid);
  extern XLogRecPtr GetRedoRecPtr(void);
  extern XLogRecPtr GetInsertRecPtr(void);
! extern void GetWriteRecPtrAndIndex(XLogRecPtr *recptr, int *index);
  extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
  extern TimeLineID GetRecoveryTargetTLI(void);
  
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 29,34 **** typedef struct WalSnd
--- 29,38 ----
  /* There is one WalSndCtl struct for the whole database cluster */
  typedef struct
  {
+ 	XLogRecPtr	sendRqst;		/* last byte + 1 to send */
+ 
+ 	slock_t		info_lck;		/* protects the above field */
+ 
  	WalSnd		walsnds[1];		/* VARIABLE LENGTH ARRAY */
  } WalSndCtlData;
  
***************
*** 45,50 **** extern int	WalSenderMain(void);
--- 49,55 ----
  extern void WalSndSignals(void);
  extern Size WalSndShmemSize(void);
  extern void WalSndShmemInit(void);
+ extern void SetXLogSendRqst(XLogRecPtr recptr);
  extern XLogRecPtr GetOldestWALSendPointer(void);
  
  #endif   /* _WALSENDER_H */
