I've been looking at various ways to make WALInsertLock less of a bottleneck on multi-CPU servers. The key is going to be to separate the two things that are done while holding the WALInsertLock: a) allocating the required space in the WAL, and b) calculating the CRC of the record header and copying the data to the WAL page. a) needs to be serialized, but b) could be done in parallel.

I've been experimenting with different approaches to do that, but one thing is common among all of them: you need to know the total amount of WAL space needed for the record, including backup blocks, before you take the lock. So, here's a patch to move things around in XLogInsert() a bit, to accomplish that.

This patch doesn't seem to have any performance or scalability impact. I must admit I expected it to give a tiny gain in scalability by shortening the time WALInsertLock is held by a few instructions, but I can't measure any. But IMO it makes the code more readable, so this is worthwhile for that reason alone.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 690,695 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
--- 690,698 ----
  	uint32		freespace;
  	int			curridx;
  	XLogRecData *rdt;
+ 	XLogRecData *rdt_cur;
+ 	XLogRecData	*rdt_bkp_first;
+ 	XLogRecData *rdt_bkp_last;
  	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];
  	bool		dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
  	BkpBlock	dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
***************
*** 704,709 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
--- 707,713 ----
  	bool		updrqst;
  	bool		doPageWrites;
  	bool		isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+ 	uint8		info_final;
  
  	/* cross-check on whether we should be here or not */
  	if (!XLogInsertAllowed())
***************
*** 727,738 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  	}
  
  	/*
! 	 * Here we scan the rdata chain, determine which buffers must be backed
! 	 * up, and compute the CRC values for the data.  Note that the record
! 	 * header isn't added into the CRC initially since we don't know the final
! 	 * length or info bits quite yet.  Thus, the CRC will represent the CRC of
! 	 * the whole record in the order "rdata, then backup blocks, then record
! 	 * header".
  	 *
  	 * We may have to loop back to here if a race condition is detected below.
  	 * We could prevent the race by doing all this work while holding the
--- 731,738 ----
  	}
  
  	/*
! 	 * Here we scan the rdata chain to determine which buffers must be backed
! 	 * up.
  	 *
  	 * We may have to loop back to here if a race condition is detected below.
  	 * We could prevent the race by doing all this work while holding the
***************
*** 746,751 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
--- 746,752 ----
  	 * over the chain later.
  	 */
  begin:;
+ 	info_final = info;
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
  		dtbuf[i] = InvalidBuffer;
***************
*** 760,766 **** begin:;
  	 */
  	doPageWrites = fullPageWrites || Insert->forcePageWrites;
  
- 	INIT_CRC32(rdata_crc);
  	len = 0;
  	for (rdt = rdata;;)
  	{
--- 761,766 ----
***************
*** 768,774 **** begin:;
  		{
  			/* Simple data, just include it */
  			len += rdt->len;
- 			COMP_CRC32(rdata_crc, rdt->data, rdt->len);
  		}
  		else
  		{
--- 768,773 ----
***************
*** 779,790 **** begin:;
  				{
  					/* Buffer already referenced by earlier chain item */
  					if (dtbuf_bkp[i])
  						rdt->data = NULL;
  					else if (rdt->data)
- 					{
  						len += rdt->len;
- 						COMP_CRC32(rdata_crc, rdt->data, rdt->len);
- 					}
  					break;
  				}
  				if (dtbuf[i] == InvalidBuffer)
--- 778,789 ----
  				{
  					/* Buffer already referenced by earlier chain item */
  					if (dtbuf_bkp[i])
+ 					{
  						rdt->data = NULL;
+ 						rdt->len = 0;
+ 					}
  					else if (rdt->data)
  						len += rdt->len;
  					break;
  				}
  				if (dtbuf[i] == InvalidBuffer)
***************
*** 796,807 **** begin:;
  					{
  						dtbuf_bkp[i] = true;
  						rdt->data = NULL;
  					}
  					else if (rdt->data)
- 					{
  						len += rdt->len;
- 						COMP_CRC32(rdata_crc, rdt->data, rdt->len);
- 					}
  					break;
  				}
  			}
--- 795,804 ----
  					{
  						dtbuf_bkp[i] = true;
  						rdt->data = NULL;
+ 						rdt->len = 0;
  					}
  					else if (rdt->data)
  						len += rdt->len;
  					break;
  				}
  			}
***************
*** 816,854 **** begin:;
  	}
  
  	/*
! 	 * Now add the backup block headers and data into the CRC
  	 */
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
! 		if (dtbuf_bkp[i])
  		{
! 			BkpBlock   *bkpb = &(dtbuf_xlg[i]);
! 			char	   *page;
! 
! 			COMP_CRC32(rdata_crc,
! 					   (char *) bkpb,
! 					   sizeof(BkpBlock));
! 			page = (char *) BufferGetBlock(dtbuf[i]);
! 			if (bkpb->hole_length == 0)
! 			{
! 				COMP_CRC32(rdata_crc,
! 						   page,
! 						   BLCKSZ);
! 			}
! 			else
! 			{
! 				/* must skip the hole */
! 				COMP_CRC32(rdata_crc,
! 						   page,
! 						   bkpb->hole_offset);
! 				COMP_CRC32(rdata_crc,
! 						   page + (bkpb->hole_offset + bkpb->hole_length),
! 						   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
! 			}
  		}
  	}
  
  	/*
  	 * NOTE: We disallow len == 0 because it provides a useful bit of extra
  	 * error checking in ReadRecord.  This means that all callers of
  	 * XLogInsert must supply at least some not-in-a-buffer data.  However, we
--- 813,898 ----
  	}
  
  	/*
! 	 * Make additional rdata chain entries for the backup blocks, so that we
! 	 * don't need to special-case them in the write loop.  They are kept in
! 	 * a separate rdata chain until we have the lock, and know that we won't
! 	 * need to restart from scratch anymore. The real rdata chain is kept
! 	 * unmodified until then.
! 	 *
! 	 * At the exit of this loop, write_len includes the backup block data.
! 	 *
! 	 * Also set the appropriate info bits to show which buffers were backed
! 	 * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
! 	 * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
  	 */
+ 	write_len = len;
+ 	rdt_bkp_first = rdt_bkp_last = NULL;
  	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
  	{
! 		BkpBlock   *bkpb;
! 		char	   *page;
! 
! 		if (!dtbuf_bkp[i])
! 			continue;
! 
! 		info_final |= XLR_SET_BKP_BLOCK(i);
! 
! 		bkpb = &(dtbuf_xlg[i]);
! 		page = (char *) BufferGetBlock(dtbuf[i]);
! 
! 		if (rdt_bkp_first == NULL)
! 			rdt_bkp_first = &(dtbuf_rdt1[i]);
! 		else
! 			rdt_bkp_last->next = &(dtbuf_rdt1[i]);
! 
! 		rdt_bkp_last = &(dtbuf_rdt1[i]);
! 
! 		rdt_bkp_last->data = (char *) bkpb;
! 		rdt_bkp_last->len = sizeof(BkpBlock);
! 		write_len += sizeof(BkpBlock);
! 
! 		rdt_bkp_last->next = &(dtbuf_rdt2[i]);
! 		rdt_bkp_last = rdt_bkp_last->next;
! 
! 		if (bkpb->hole_length == 0)
  		{
! 			rdt_bkp_last->data = page;
! 			rdt_bkp_last->len = BLCKSZ;
! 			write_len += BLCKSZ;
! 			rdt_bkp_last->next = NULL;
! 		}
! 		else
! 		{
! 			/* must skip the hole */
! 			rdt_bkp_last->data = page;
! 			rdt_bkp_last->len = bkpb->hole_offset;
! 			write_len += bkpb->hole_offset;
! 
! 			rdt_bkp_last->next = &(dtbuf_rdt3[i]);
! 			rdt_bkp_last = rdt_bkp_last->next;
! 
! 			rdt_bkp_last->data = page + (bkpb->hole_offset + bkpb->hole_length);
! 			rdt_bkp_last->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
! 			write_len += rdt_bkp_last->len;
! 			rdt_bkp_last->next = NULL;
  		}
  	}
  
  	/*
+ 	 * Calculate CRC of the data, including all the backup blocks
+ 	 *
+ 	 * Note that the record header isn't added into the CRC initially since
+ 	 * we don't know the prev-link yet.  Thus, the CRC will represent the CRC
+ 	 * of the whole record in the order "rdata, then backup blocks, then
+ 	 * record header".
+ 	 */
+ 	INIT_CRC32(rdata_crc);
+ 	for (rdt_cur = rdt; rdt_cur != NULL; rdt_cur = rdt_cur->next)
+ 		COMP_CRC32(rdata_crc, rdt_cur->data, rdt_cur->len);
+ 	for (rdt_cur = rdt_bkp_first; rdt_cur != NULL; rdt_cur = rdt_cur->next)
+ 		COMP_CRC32(rdata_crc, rdt_cur->data, rdt_cur->len);
+ 
+ 	/*
  	 * NOTE: We disallow len == 0 because it provides a useful bit of extra
  	 * error checking in ReadRecord.  This means that all callers of
  	 * XLogInsert must supply at least some not-in-a-buffer data.  However, we
***************
*** 913,974 **** begin:;
  	}
  
  	/*
! 	 * Make additional rdata chain entries for the backup blocks, so that we
! 	 * don't need to special-case them in the write loop.  Note that we have
! 	 * now irrevocably changed the input rdata chain.  At the exit of this
! 	 * loop, write_len includes the backup block data.
! 	 *
! 	 * Also set the appropriate info bits to show which buffers were backed
! 	 * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
! 	 * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
  	 */
! 	write_len = len;
! 	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
! 	{
! 		BkpBlock   *bkpb;
! 		char	   *page;
! 
! 		if (!dtbuf_bkp[i])
! 			continue;
! 
! 		info |= XLR_SET_BKP_BLOCK(i);
! 
! 		bkpb = &(dtbuf_xlg[i]);
! 		page = (char *) BufferGetBlock(dtbuf[i]);
! 
! 		rdt->next = &(dtbuf_rdt1[i]);
! 		rdt = rdt->next;
! 
! 		rdt->data = (char *) bkpb;
! 		rdt->len = sizeof(BkpBlock);
! 		write_len += sizeof(BkpBlock);
! 
! 		rdt->next = &(dtbuf_rdt2[i]);
! 		rdt = rdt->next;
! 
! 		if (bkpb->hole_length == 0)
! 		{
! 			rdt->data = page;
! 			rdt->len = BLCKSZ;
! 			write_len += BLCKSZ;
! 			rdt->next = NULL;
! 		}
! 		else
! 		{
! 			/* must skip the hole */
! 			rdt->data = page;
! 			rdt->len = bkpb->hole_offset;
! 			write_len += bkpb->hole_offset;
! 
! 			rdt->next = &(dtbuf_rdt3[i]);
! 			rdt = rdt->next;
! 
! 			rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
! 			rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
! 			write_len += rdt->len;
! 			rdt->next = NULL;
! 		}
! 	}
  
  	/*
  	 * If there isn't enough space on the current XLOG page for a record
--- 957,966 ----
  	}
  
  	/*
! 	 * We can now link the backup block rdata entries into the chain. This
! 	 * changes the rdata chain irrevocably.
  	 */
! 	rdt->next = rdt_bkp_first;
  
  	/*
  	 * If there isn't enough space on the current XLOG page for a record
***************
*** 1031,1037 **** begin:;
  	record->xl_xid = GetCurrentTransactionIdIfAny();
  	record->xl_tot_len = SizeOfXLogRecord + write_len;
  	record->xl_len = len;		/* doesn't include backup blocks */
! 	record->xl_info = info;
  	record->xl_rmid = rmid;
  
  	/* Now we can finish computing the record's CRC */
--- 1023,1029 ----
  	record->xl_xid = GetCurrentTransactionIdIfAny();
  	record->xl_tot_len = SizeOfXLogRecord + write_len;
  	record->xl_len = len;		/* doesn't include backup blocks */
! 	record->xl_info = info_final;
  	record->xl_rmid = rmid;
  
  	/* Now we can finish computing the record's CRC */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to