On 08.11.2010 15:40, Heikki Linnakangas wrote:
Here's a first draft of this, using the inCommit flag as is. It works,
but suffers from starvation if you have a lot of concurrent
multi-WAL-record actions. I tested that by running INSERTs to a table
with tsvector field with a GiST index on it from five concurrent
sessions, and saw checkpoints regularly busy-waiting for over a minute.

To avoid that, we need something a little bit more complicated than a
boolean flag. I'm thinking of adding a counter beside the inCommit flag
that's incremented every time a new multi-WAL-record action begins, so
that the checkpoint process can distinguish between a new action that
was started after deciding the REDO pointer and an old one that's still
running.

(inCommit is a misnomer now, of course. Will need to find a better name..)

Here's a 2nd version, with an additional counter in PGPROC to avoid starving checkpoint in the face of a constant stream e.g GiST inserts.

The new rule is that before you start a multi-WAL-record operation that needs to be completed at end of recovery if you crash in the middle, you call HoldCheckpoint(), and once you're finished, ResumeCheckpoint(). rm_safe_restartpoint() is gone.

This is a pre-existing bug, but given the lack of field reports and the fact that it's pretty darn hard to run into this in real life, I'm inclined to not backpatch this.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/gin/ginbtree.c
--- b/src/backend/access/gin/ginbtree.c
***************
*** 17,22 ****
--- 17,23 ----
  #include "access/gin.h"
  #include "miscadmin.h"
  #include "storage/bufmgr.h"
+ #include "storage/proc.h"
  #include "utils/rel.h"
  
  /*
***************
*** 281,286 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
--- 282,288 ----
  	Page		page,
  				rpage,
  				lpage;
+ 	bool		splitInProgress = false;
  
  	/* remember root BlockNumber */
  	while (parent)
***************
*** 318,324 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
  
  			freeGinBtreeStack(stack);
  
! 			return;
  		}
  		else
  		{
--- 320,326 ----
  
  			freeGinBtreeStack(stack);
  
! 			break; /* don't need to recurse to parent */
  		}
  		else
  		{
***************
*** 326,331 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
--- 328,344 ----
  			Page		newlpage;
  
  			/*
+ 			 * Hold off checkpoints until we've finished this split by
+ 			 * inserting the parent pointer. Replay might miss the incomplete
+ 			 * split otherwise.
+ 			 */
+ 			if (!!btree->index->rd_istemp)
+ 			{
+ 				splitInProgress = true;
+ 				HoldCheckpoint();
+ 			}
+ 
+ 			/*
  			 * newlpage is a pointer to memory page, it doesn't associate with
  			 * buffer, stack->buffer should be untouched
  			 */
***************
*** 402,408 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
  						buildStats->nEntryPages++;
  				}
  
! 				return;
  			}
  			else
  			{
--- 415,421 ----
  						buildStats->nEntryPages++;
  				}
  
! 				break; /* don't need to recurse to parent */
  			}
  			else
  			{
***************
*** 472,475 **** ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
--- 485,495 ----
  		pfree(stack);
  		stack = parent;
  	}
+ 
+ 	/*
+ 	 * If we had to split a page, let checkpointer know that we're now
+ 	 * finished with it.
+ 	 */
+ 	if (splitInProgress)
+ 		ResumeCheckpoint();
  }
*** a/src/backend/access/gin/ginxlog.c
--- b/src/backend/access/gin/ginxlog.c
***************
*** 866,876 **** gin_xlog_cleanup(void)
  	MemoryContextDelete(opCtx);
  	incomplete_splits = NIL;
  }
- 
- bool
- gin_safe_restartpoint(void)
- {
- 	if (incomplete_splits)
- 		return false;
- 	return true;
- }
--- 866,868 ----
*** a/src/backend/access/gist/gist.c
--- b/src/backend/access/gist/gist.c
***************
*** 20,25 ****
--- 20,26 ----
  #include "miscadmin.h"
  #include "storage/bufmgr.h"
  #include "storage/indexfsm.h"
+ #include "storage/proc.h"
  #include "utils/memutils.h"
  
  const XLogRecPtr XLogRecPtrForTemp = {1, 1};
***************
*** 272,283 **** gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
--- 273,290 ----
  	state.r = r;
  	state.key = itup->t_tid;
  	state.needInsertComplete = true;
+ 	/* Hold off checkpoints until we've completed this insertion. */
+ 	if (!r->rd_istemp)
+ 		HoldCheckpoint();
  
  	state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
  	state.stack->blkno = GIST_ROOT_BLKNO;
  
  	gistfindleaf(&state, giststate);
  	gistmakedeal(&state, giststate);
+ 
+ 	if (!r->rd_istemp)
+ 		ResumeCheckpoint();
  }
  
  static bool
*** a/src/backend/access/gist/gistxlog.c
--- b/src/backend/access/gist/gistxlog.c
***************
*** 838,851 **** gist_xlog_cleanup(void)
  	MemoryContextDelete(insertCtx);
  }
  
- bool
- gist_safe_restartpoint(void)
- {
- 	if (incomplete_inserts)
- 		return false;
- 	return true;
- }
- 
  
  XLogRecData *
  formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
--- 838,843 ----
*** a/src/backend/access/nbtree/nbtinsert.c
--- b/src/backend/access/nbtree/nbtinsert.c
***************
*** 21,26 ****
--- 21,27 ----
  #include "miscadmin.h"
  #include "storage/bufmgr.h"
  #include "storage/lmgr.h"
+ #include "storage/proc.h"
  #include "utils/inval.h"
  #include "utils/tqual.h"
  
***************
*** 63,69 **** static void _bt_insertonpg(Relation rel, Buffer buf,
  			   BTStack stack,
  			   IndexTuple itup,
  			   OffsetNumber newitemoff,
! 			   bool split_only_page);
  static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
  		  OffsetNumber newitemoff, Size newitemsz,
  		  IndexTuple newitem, bool newitemonleft);
--- 64,71 ----
  			   BTStack stack,
  			   IndexTuple itup,
  			   OffsetNumber newitemoff,
! 			   bool split_only_page,
! 			   bool isleaf);
  static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
  		  OffsetNumber newitemoff, Size newitemsz,
  		  IndexTuple newitem, bool newitemonleft);
***************
*** 176,182 **** top:
  	{
  		/* do the insertion */
  		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
! 		_bt_insertonpg(rel, buf, stack, itup, offset, false);
  	}
  	else
  	{
--- 178,184 ----
  	{
  		/* do the insertion */
  		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
! 		_bt_insertonpg(rel, buf, stack, itup, offset, false, true);
  	}
  	else
  	{
***************
*** 660,666 **** _bt_insertonpg(Relation rel,
  			   BTStack stack,
  			   IndexTuple itup,
  			   OffsetNumber newitemoff,
! 			   bool split_only_page)
  {
  	Page		page;
  	BTPageOpaque lpageop;
--- 662,669 ----
  			   BTStack stack,
  			   IndexTuple itup,
  			   OffsetNumber newitemoff,
! 			   bool split_only_page,
! 			   bool isleaf)
  {
  	Page		page;
  	BTPageOpaque lpageop;
***************
*** 714,719 **** _bt_insertonpg(Relation rel,
--- 717,726 ----
  		 *----------
  		 */
  		_bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
+ 
+ 		/* Finish the "checkpoint critical section" started in _bt_split() */
+ 		if (!rel->rd_istemp && isleaf)
+ 			ResumeCheckpoint();
  	}
  	else
  	{
***************
*** 870,875 **** _bt_insertonpg(Relation rel,
--- 877,886 ----
   *
   *		Returns the new right sibling of buf, pinned and write-locked.
   *		The pin and lock on buf are maintained.
+  *
+  *		If it's not a temporary index, _bt_split() calls HoldCheckpoint(),
+  *		and the caller is responsible for calling ResumeCheckpoint() after
+  *		inserting the parent pointer.
   */
  static Buffer
  _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
***************
*** 1139,1144 **** _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
--- 1150,1167 ----
  	START_CRIT_SECTION();
  
  	/*
+ 	 * Hold off checkpoints until we've inserted the parent pointer. Without
+ 	 * this, it would be possible for the checkpoint to set REDO after the
+ 	 * split WAL record and quickly finish the checkpoint before the insertion
+ 	 * of the parent pointer has been WAL-logged. Incomplete splits are
+ 	 * normally completed at the end of recovery, but if there's no trace of
+ 	 * the split between the redo pointer and checkpoint record, recovery
+ 	 * would not know that the split is incomplete.
+ 	 */
+ 	if (!rel->rd_istemp)
+ 		HoldCheckpoint();
+ 
+ 	/*
  	 * By here, the original data page has been split into two new halves, and
  	 * these are correct.  The algorithm requires that the left page never
  	 * move during a split, so we copy the new left page back on top of the
***************
*** 1683,1689 **** _bt_insert_parent(Relation rel,
  		/* Recursively update the parent */
  		_bt_insertonpg(rel, pbuf, stack->bts_parent,
  					   new_item, stack->bts_offset + 1,
! 					   is_only);
  
  		/* be tidy */
  		pfree(new_item);
--- 1706,1712 ----
  		/* Recursively update the parent */
  		_bt_insertonpg(rel, pbuf, stack->bts_parent,
  					   new_item, stack->bts_offset + 1,
! 					   is_only, false);
  
  		/* be tidy */
  		pfree(new_item);
*** a/src/backend/access/nbtree/nbtxlog.c
--- b/src/backend/access/nbtree/nbtxlog.c
***************
*** 1257,1267 **** btree_xlog_cleanup(void)
  	}
  	incomplete_actions = NIL;
  }
- 
- bool
- btree_safe_restartpoint(void)
- {
- 	if (incomplete_actions)
- 		return false;
- 	return true;
- }
--- 1257,1259 ----
*** a/src/backend/access/transam/README
--- b/src/backend/access/transam/README
***************
*** 542,547 **** replay code has to do the insertion on its own to restore the index to
--- 542,560 ----
  consistency.  Such insertions occur after WAL is operational, so they can
  and should write WAL records for the additional generated actions.
  
+ Finishing incomplete actions at the end of recovery only works if the recovery
+ sees the WAL record starting the action. Because of that, you need to ensure
+ that a checkpoint doesn't happen in the middle of such an operation. To be
+ precise, a checkpoint mustn't begin and finish so that there is a multi-record
+ WAL action in progress, but no WAL record part of the action lies between the
+ REDO pointer and the checkpoint record. Recovery from such a checkpoint would
+ not see any evidence of the still incomplete multi-record action, so it would
+ not know that the operation needs to be finished at the end of recovery. To
+ prevent that, you must call HoldCheckpoint() before writing the first WAL
+ record of the operation, and ResumeCheckpoint() after it's finished. The
+ checkpoint code checks for that after deciding the REDO pointer, waiting until
+ all the operations that were in-progress when the checkpoint started have
+ finished, before it writes the checkpoint record.
  
  Write-Ahead Logging for Filesystem Actions
  ------------------------------------------
*** a/src/backend/access/transam/rmgr.c
--- b/src/backend/access/transam/rmgr.c
***************
*** 26,45 ****
  
  
  const RmgrData RmgrTable[RM_MAX_ID + 1] = {
! 	{"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL},
! 	{"Transaction", xact_redo, xact_desc, NULL, NULL, NULL},
! 	{"Storage", smgr_redo, smgr_desc, NULL, NULL, NULL},
! 	{"CLOG", clog_redo, clog_desc, NULL, NULL, NULL},
! 	{"Database", dbase_redo, dbase_desc, NULL, NULL, NULL},
! 	{"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL},
! 	{"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL},
! 	{"RelMap", relmap_redo, relmap_desc, NULL, NULL, NULL},
! 	{"Standby", standby_redo, standby_desc, NULL, NULL, NULL},
! 	{"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL},
! 	{"Heap", heap_redo, heap_desc, NULL, NULL, NULL},
! 	{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
! 	{"Hash", hash_redo, hash_desc, NULL, NULL, NULL},
! 	{"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},
! 	{"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint},
! 	{"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}
  };
--- 26,45 ----
  
  
  const RmgrData RmgrTable[RM_MAX_ID + 1] = {
! 	{"XLOG", xlog_redo, xlog_desc, NULL, NULL},
! 	{"Transaction", xact_redo, xact_desc, NULL, NULL},
! 	{"Storage", smgr_redo, smgr_desc, NULL, NULL},
! 	{"CLOG", clog_redo, clog_desc, NULL, NULL},
! 	{"Database", dbase_redo, dbase_desc, NULL, NULL},
! 	{"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL},
! 	{"MultiXact", multixact_redo, multixact_desc, NULL, NULL},
! 	{"RelMap", relmap_redo, relmap_desc, NULL, NULL},
! 	{"Standby", standby_redo, standby_desc, NULL, NULL},
! 	{"Heap2", heap2_redo, heap2_desc, NULL, NULL},
! 	{"Heap", heap_redo, heap_desc, NULL, NULL},
! 	{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup},
! 	{"Hash", hash_redo, hash_desc, NULL, NULL},
! 	{"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup},
! 	{"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup},
! 	{"Sequence", seq_redo, seq_desc, NULL, NULL}
  };
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 313,319 **** MarkAsPreparing(TransactionId xid, const char *gid,
  	gxact->proc.backendId = InvalidBackendId;
  	gxact->proc.databaseId = databaseid;
  	gxact->proc.roleId = owner;
! 	gxact->proc.inCommit = false;
  	gxact->proc.vacuumFlags = 0;
  	gxact->proc.lwWaiting = false;
  	gxact->proc.lwExclusive = false;
--- 313,320 ----
  	gxact->proc.backendId = InvalidBackendId;
  	gxact->proc.databaseId = databaseid;
  	gxact->proc.roleId = owner;
! 	gxact->proc.holdCheckpoint = false;
! 	gxact->proc.chkptHoldCounter = 0;
  	gxact->proc.vacuumFlags = 0;
  	gxact->proc.lwWaiting = false;
  	gxact->proc.lwExclusive = false;
***************
*** 1018,1024 **** EndPrepare(GlobalTransaction gxact)
  	 */
  	START_CRIT_SECTION();
  
! 	MyProc->inCommit = true;
  
  	gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
  									records.head);
--- 1019,1025 ----
  	 */
  	START_CRIT_SECTION();
  
! 	MyProc->holdCheckpoint = true;
  
  	gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
  									records.head);
***************
*** 1066,1072 **** EndPrepare(GlobalTransaction gxact)
  	 * checkpoint starting after this will certainly see the gxact as a
  	 * candidate for fsyncing.
  	 */
! 	MyProc->inCommit = false;
  
  	END_CRIT_SECTION();
  
--- 1067,1073 ----
  	 * checkpoint starting after this will certainly see the gxact as a
  	 * candidate for fsyncing.
  	 */
! 	MyProc->holdCheckpoint = false;
  
  	END_CRIT_SECTION();
  
***************
*** 1959,1965 **** RecordTransactionCommitPrepared(TransactionId xid,
  	START_CRIT_SECTION();
  
  	/* See notes in RecordTransactionCommit */
! 	MyProc->inCommit = true;
  
  	/* Emit the XLOG commit record */
  	xlrec.xid = xid;
--- 1960,1966 ----
  	START_CRIT_SECTION();
  
  	/* See notes in RecordTransactionCommit */
! 	MyProc->holdCheckpoint = true;
  
  	/* Emit the XLOG commit record */
  	xlrec.xid = xid;
***************
*** 2024,2030 **** RecordTransactionCommitPrepared(TransactionId xid,
  	TransactionIdCommitTree(xid, nchildren, children);
  
  	/* Checkpoint can proceed now */
! 	MyProc->inCommit = false;
  
  	END_CRIT_SECTION();
  }
--- 2025,2031 ----
  	TransactionIdCommitTree(xid, nchildren, children);
  
  	/* Checkpoint can proceed now */
! 	MyProc->holdCheckpoint = false;
  
  	END_CRIT_SECTION();
  }
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 968,975 **** RecordTransactionCommit(void)
  		xlrec.tsId = MyDatabaseTableSpace;
  
  		/*
! 		 * Mark ourselves as within our "commit critical section".	This
! 		 * forces any concurrent checkpoint to wait until we've updated
  		 * pg_clog.  Without this, it is possible for the checkpoint to set
  		 * REDO after the XLOG record but fail to flush the pg_clog update to
  		 * disk, leading to loss of the transaction commit if the system
--- 968,974 ----
  		xlrec.tsId = MyDatabaseTableSpace;
  
  		/*
! 		 * Hold off any concurrent checkpoints until we've updated
  		 * pg_clog.  Without this, it is possible for the checkpoint to set
  		 * REDO after the XLOG record but fail to flush the pg_clog update to
  		 * disk, leading to loss of the transaction commit if the system
***************
*** 979,991 **** RecordTransactionCommit(void)
  		 * RecordTransactionAbort.	That's because loss of a transaction abort
  		 * is noncritical; the presumption would be that it aborted, anyway.
  		 *
! 		 * It's safe to change the inCommit flag of our own backend without
! 		 * holding the ProcArrayLock, since we're the only one modifying it.
! 		 * This makes checkpoint's determination of which xacts are inCommit a
! 		 * bit fuzzy, but it doesn't matter.
  		 */
  		START_CRIT_SECTION();
! 		MyProc->inCommit = true;
  
  		SetCurrentTransactionStopTimestamp();
  		xlrec.xact_time = xactStopTimestamp;
--- 978,992 ----
  		 * RecordTransactionAbort.	That's because loss of a transaction abort
  		 * is noncritical; the presumption would be that it aborted, anyway.
  		 *
! 		 * We don't bother bumping chkptHoldCounter like HoldCheckpoint()
! 		 * does, as the transaction is about to end soon anyway. At worst it
! 		 * means that checkpoint thinks that we're still in some previous
! 		 * multi-record operation that happened earlier in this transaction,
! 		 * and has to wait a bit longer than necessary. Better to avoid any
! 		 * unnecessary overhead in this performance-critical path.
  		 */
  		START_CRIT_SECTION();
! 		MyProc->holdCheckpoint = true;
  
  		SetCurrentTransactionStopTimestamp();
  		xlrec.xact_time = xactStopTimestamp;
***************
*** 1100,1106 **** RecordTransactionCommit(void)
  	 */
  	if (markXidCommitted)
  	{
! 		MyProc->inCommit = false;
  		END_CRIT_SECTION();
  	}
  
--- 1101,1107 ----
  	 */
  	if (markXidCommitted)
  	{
! 		MyProc->holdCheckpoint = false;
  		END_CRIT_SECTION();
  	}
  
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 7111,7118 **** CreateCheckPoint(int flags)
  	uint32		freespace;
  	uint32		_logId;
  	uint32		_logSeg;
- 	TransactionId *inCommitXids;
- 	int			nInCommit;
  
  	/*
  	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
--- 7111,7116 ----
***************
*** 7299,7313 **** CreateCheckPoint(int flags)
  	 * and we will correctly flush the update below.  So we cannot miss any
  	 * xacts we need to wait for.
  	 */
! 	nInCommit = GetTransactionsInCommit(&inCommitXids);
! 	if (nInCommit > 0)
! 	{
! 		do
! 		{
! 			pg_usleep(10000L);	/* wait for 10 msec */
! 		} while (HaveTransactionsInCommit(inCommitXids, nInCommit));
! 	}
! 	pfree(inCommitXids);
  
  	/*
  	 * Get the other info we need for the checkpoint record.
--- 7297,7303 ----
  	 * and we will correctly flush the update below.  So we cannot miss any
  	 * xacts we need to wait for.
  	 */
! 	ProcArrayCheckpointBarrier();
  
  	/*
  	 * Get the other info we need for the checkpoint record.
***************
*** 7554,7584 **** CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
  static void
  RecoveryRestartPoint(const CheckPoint *checkPoint)
  {
- 	int			rmid;
- 
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
  
  	/*
- 	 * Is it safe to checkpoint?  We must ask each of the resource managers
- 	 * whether they have any partial state information that might prevent a
- 	 * correct restart from this point.  If so, we skip this opportunity, but
- 	 * return at the next checkpoint record for another try.
- 	 */
- 	for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
- 	{
- 		if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
- 			if (!(RmgrTable[rmid].rm_safe_restartpoint()))
- 			{
- 				elog(trace_recovery(DEBUG2), "RM %d not safe to record restart point at %X/%X",
- 					 rmid,
- 					 checkPoint->redo.xlogid,
- 					 checkPoint->redo.xrecoff);
- 				return;
- 			}
- 	}
- 
- 	/*
  	 * Copy the checkpoint record to shared memory, so that bgwriter can use
  	 * it the next time it wants to perform a restartpoint.
  	 */
--- 7544,7553 ----
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 367,373 **** ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
  		proc->xmin = InvalidTransactionId;
  		/* must be cleared with xid/xmin: */
  		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 		proc->inCommit = false; /* be sure this is cleared in abort */
  		proc->recoveryConflictPending = false;
  
  		/* Clear the subtransaction-XID cache too while holding the lock */
--- 367,373 ----
  		proc->xmin = InvalidTransactionId;
  		/* must be cleared with xid/xmin: */
  		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 		proc->holdCheckpoint = false; /* be sure this is cleared in abort */
  		proc->recoveryConflictPending = false;
  
  		/* Clear the subtransaction-XID cache too while holding the lock */
***************
*** 394,400 **** ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
  		proc->xmin = InvalidTransactionId;
  		/* must be cleared with xid/xmin: */
  		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 		proc->inCommit = false; /* be sure this is cleared in abort */
  		proc->recoveryConflictPending = false;
  
  		Assert(proc->subxids.nxids == 0);
--- 394,400 ----
  		proc->xmin = InvalidTransactionId;
  		/* must be cleared with xid/xmin: */
  		proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 		proc->holdCheckpoint = false; /* be sure this is cleared in abort */
  		proc->recoveryConflictPending = false;
  
  		Assert(proc->subxids.nxids == 0);
***************
*** 427,433 **** ProcArrayClearTransaction(PGPROC *proc)
  
  	/* redundant, but just in case */
  	proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 	proc->inCommit = false;
  
  	/* Clear the subtransaction-XID cache too */
  	proc->subxids.nxids = 0;
--- 427,433 ----
  
  	/* redundant, but just in case */
  	proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
! 	proc->holdCheckpoint = false;
  
  	/* Clear the subtransaction-XID cache too */
  	proc->subxids.nxids = 0;
***************
*** 1537,1569 **** GetRunningTransactionData(void)
  }
  
  /*
!  * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
!  *
!  * Constructs an array of XIDs of transactions that are currently in commit
!  * critical sections, as shown by having inCommit set in their PGPROC entries.
!  *
!  * *xids_p is set to a palloc'd array that should be freed by the caller.
!  * The return value is the number of valid entries.
!  *
!  * Note that because backends set or clear inCommit without holding any lock,
!  * the result is somewhat indeterminate, but we don't really care.  Even in
!  * a multiprocessor with delayed writes to shared memory, it should be certain
!  * that setting of inCommit will propagate to shared memory when the backend
!  * takes the WALInsertLock, so we cannot fail to see an xact as inCommit if
!  * it's already inserted its commit record.  Whether it takes a little while
!  * for clearing of inCommit to propagate is unimportant for correctness.
   */
! int
! GetTransactionsInCommit(TransactionId **xids_p)
  {
  	ProcArrayStruct *arrayP = procArray;
! 	TransactionId *xids;
  	int			nxids;
  	int			index;
  
! 	xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId));
  	nxids = 0;
  
  	LWLockAcquire(ProcArrayLock, LW_SHARED);
  
  	for (index = 0; index < arrayP->numProcs; index++)
--- 1537,1586 ----
  }
  
  /*
!  * ProcArrayCheckpointBarrier -- Wait until all multi-record actions finish
!  *
!  * Waits until all backends finish any multi-WAL-record actions, or other
!  * incomplete actions, so that it is safe to perform a checkpoint.
!  * This is called in the checkpoint process after deciding REDO pointer,
!  * but before writing the checkpoint record.
!  *
!  * We don't need to wait for any new multi-record actions that begin after
!  * we've started waiting, they will be seen on recovery from the REDO
!  * pointer.
!  *
!  * Note that because backends set or clear holdCheckpoints flag and 
!  * chkptHoldCounter without holding any lock, the result is somewhat
!  * indeterminate, but we don't really care.  Even in a multiprocessor with
!  * delayed writes to shared memory, it should be certain that setting of
!  * holdCheckpoints will propagate to shared memory when the backend takes the
!  * WALInsertLock to write the first WAL record in the action, so we cannot
!  * fail to see a multi-step action as holding checkpoints if it's already
!  * inserted its first WAL record.  Whether it takes a little while for
!  * clearing of holdCheckpoints to propagate is unimportant for correctness.
!  * Likewise we don't care much if the change of checkpointHoldCounter is seen
!  * as atomic, we just care it it's the same or different as it was when we
!  * started. By the time the backend takes the WALInsertLock, it should be
!  * updated in shared memory, and until that we don't really need to wait for
!  * the backend anyway.
   */
! void
! ProcArrayCheckpointBarrier(void)
  {
  	ProcArrayStruct *arrayP = procArray;
! 	struct xid_barrier {
! 		TransactionId xid;
! 		int32 counter;
! 	} *xids;
  	int			nxids;
  	int			index;
  
! 	xids = (struct xid_barrier *) palloc(arrayP->maxProcs * sizeof(struct xid_barrier));
  	nxids = 0;
  
+ 	/*
+ 	 * Take a snapshot of the chkptHoldCounter values of all backends
+ 	 * currently holding off checkpoints.
+ 	 */
  	LWLockAcquire(ProcArrayLock, LW_SHARED);
  
  	for (index = 0; index < arrayP->numProcs; index++)
***************
*** 1573,1633 **** GetTransactionsInCommit(TransactionId **xids_p)
  		/* Fetch xid just once - see GetNewTransactionId */
  		TransactionId pxid = proc->xid;
  
! 		if (proc->inCommit && TransactionIdIsValid(pxid))
! 			xids[nxids++] = pxid;
  	}
  
  	LWLockRelease(ProcArrayLock);
  
! 	*xids_p = xids;
! 	return nxids;
! }
! 
! /*
!  * HaveTransactionsInCommit -- Are any of the specified XIDs in commit?
!  *
!  * This is used with the results of GetTransactionsInCommit to see if any
!  * of the specified XIDs are still in their commit critical sections.
!  *
!  * Note: this is O(N^2) in the number of xacts that are/were in commit, but
!  * those numbers should be small enough for it not to be a problem.
!  */
! bool
! HaveTransactionsInCommit(TransactionId *xids, int nxids)
! {
! 	bool		result = false;
! 	ProcArrayStruct *arrayP = procArray;
! 	int			index;
! 
! 	LWLockAcquire(ProcArrayLock, LW_SHARED);
! 
! 	for (index = 0; index < arrayP->numProcs; index++)
  	{
! 		volatile PGPROC *proc = arrayP->procs[index];
! 
! 		/* Fetch xid just once - see GetNewTransactionId */
! 		TransactionId pxid = proc->xid;
! 
! 		if (proc->inCommit && TransactionIdIsValid(pxid))
  		{
! 			int			i;
  
! 			for (i = 0; i < nxids; i++)
  			{
! 				if (xids[i] == pxid)
  				{
! 					result = true;
! 					break;
  				}
  			}
! 			if (result)
! 				break;
! 		}
  	}
! 
! 	LWLockRelease(ProcArrayLock);
! 
! 	return result;
  }
  
  /*
--- 1590,1651 ----
  		/* Fetch xid just once - see GetNewTransactionId */
  		TransactionId pxid = proc->xid;
  
! 		if (proc->holdCheckpoint && TransactionIdIsValid(pxid))
! 		{
! 			xids[nxids].xid = pxid;
! 			xids[nxids].counter = proc->chkptHoldCounter;
! 			nxids++;
! 		}
  	}
  
  	LWLockRelease(ProcArrayLock);
  
! 	/*
! 	 * Wait until all operations that were in-progress when we took the
! 	 * snapshot have finished.
! 	 *
! 	 * Note: this is O(N^2) in the number of operations that are/were in
! 	 * progress, but those numbers should be small enough for it not to be
! 	 * a problem.
! 	 */
! 	if (nxids > 0)
  	{
! 		bool stillHeld;
! 		do
  		{
! 			pg_usleep(10000L);	/* wait for 10 msec */
! 
! 			stillHeld = false;
  
! 			LWLockAcquire(ProcArrayLock, LW_SHARED);
! 			for (index = 0; index < arrayP->numProcs; index++)
  			{
! 				volatile PGPROC *proc = arrayP->procs[index];
! 
! 				/* Fetch xid just once - see GetNewTransactionId */
! 				TransactionId pxid = proc->xid;
! 
! 				if (proc->holdCheckpoint && TransactionIdIsValid(pxid))
  				{
! 					int			i;
! 
! 					for (i = 0; i < nxids; i++)
! 					{
! 						if (xids[i].xid == pxid)
! 						{
! 							if (xids[i].counter == proc->chkptHoldCounter)
! 								stillHeld = true;
! 							break;
! 						}
! 					}
! 					if (stillHeld)
! 						break;
  				}
  			}
! 			LWLockRelease(ProcArrayLock);
! 		} while (stillHeld);
  	}
! 	pfree(xids);
  }
  
  /*
*** a/src/backend/storage/lmgr/proc.c
--- b/src/backend/storage/lmgr/proc.c
***************
*** 312,318 **** InitProcess(void)
  	MyProc->backendId = InvalidBackendId;
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
! 	MyProc->inCommit = false;
  	MyProc->vacuumFlags = 0;
  	/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
  	if (IsAutoVacuumWorkerProcess())
--- 312,319 ----
  	MyProc->backendId = InvalidBackendId;
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
! 	MyProc->holdCheckpoint = false;
! 	MyProc->chkptHoldCounter = 0;
  	MyProc->vacuumFlags = 0;
  	/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
  	if (IsAutoVacuumWorkerProcess())
***************
*** 450,456 **** InitAuxiliaryProcess(void)
  	MyProc->backendId = InvalidBackendId;
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
! 	MyProc->inCommit = false;
  	MyProc->vacuumFlags = 0;
  	MyProc->lwWaiting = false;
  	MyProc->lwExclusive = false;
--- 451,458 ----
  	MyProc->backendId = InvalidBackendId;
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
! 	MyProc->holdCheckpoint = false;
! 	MyProc->chkptHoldCounter = 0;
  	MyProc->vacuumFlags = 0;
  	MyProc->lwWaiting = false;
  	MyProc->lwExclusive = false;
***************
*** 1784,1786 **** handle_standby_sig_alarm(SIGNAL_ARGS)
--- 1786,1829 ----
  
  	errno = save_errno;
  }
+ 
+ /*
+  * Hold off checkpoints. A checkpoint that begins after this call won't
+  * finish until ResumeCheckpoint() is called. A checkpoint that is already
+  * in progress is not affected.
+  *
+  * You must call HoldCheckpoint() before starting an operation that consists
+  * of multiple WAL records, relying on the rmgr cleanup to finish an
+  * incomplete operation at end of recovery.
+  */
+ void
+ HoldCheckpoint(void)
+ {
+ 	volatile PGPROC *p = MyProc;
+ 	Assert(!p->holdCheckpoint);
+ 
+ 	/*
+ 	 * Increment the counter, so that checkpoint can differentiate between
+ 	 * an operation that was in progress when the checkpoint began, and a
+ 	 * later one in the same transaction that began later.
+ 	 *
+ 	 * It's safe to change these for our own backend without holding the
+ 	 * ProcArrayLock, since we're the only one modifying them. This makes
+ 	 * checkpoint's determination of which xacts are holding off checkpoints
+ 	 * a bit fuzzy, but it doesn't matter. See also comments in
+ 	 * ProcArrayCheckpointBarrier().
+ 	 */
+ 	p->chkptHoldCounter++;
+ 	p->holdCheckpoint = true;
+ }
+ 
+ /*
+  * Allow a possible concurrent checkpoint to finish.
+  */
+ void
+ ResumeCheckpoint(void)
+ {
+ 	volatile PGPROC *p = MyProc;
+ 	Assert(p->holdCheckpoint);
+ 	p->holdCheckpoint = false;
+ }
*** a/src/include/access/gin.h
--- b/src/include/access/gin.h
***************
*** 400,406 **** extern void gin_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void gin_desc(StringInfo buf, uint8 xl_info, char *rec);
  extern void gin_xlog_startup(void);
  extern void gin_xlog_cleanup(void);
- extern bool gin_safe_restartpoint(void);
  
  /* ginbtree.c */
  
--- 400,405 ----
*** a/src/include/access/gist_private.h
--- b/src/include/access/gist_private.h
***************
*** 252,258 **** extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec);
  extern void gist_xlog_startup(void);
  extern void gist_xlog_cleanup(void);
- extern bool gist_safe_restartpoint(void);
  extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
  
  extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
--- 252,257 ----
*** a/src/include/access/nbtree.h
--- b/src/include/access/nbtree.h
***************
*** 647,652 **** extern void btree_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void btree_desc(StringInfo buf, uint8 xl_info, char *rec);
  extern void btree_xlog_startup(void);
  extern void btree_xlog_cleanup(void);
- extern bool btree_safe_restartpoint(void);
  
  #endif   /* NBTREE_H */
--- 647,651 ----
*** a/src/include/access/xlog_internal.h
--- b/src/include/access/xlog_internal.h
***************
*** 250,256 **** typedef struct RmgrData
  	void		(*rm_desc) (StringInfo buf, uint8 xl_info, char *rec);
  	void		(*rm_startup) (void);
  	void		(*rm_cleanup) (void);
- 	bool		(*rm_safe_restartpoint) (void);
  } RmgrData;
  
  extern const RmgrData RmgrTable[];
--- 250,255 ----
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 91,97 **** struct PGPROC
  	Oid			databaseId;		/* OID of database this backend is using */
  	Oid			roleId;			/* OID of role using this backend */
  
! 	bool		inCommit;		/* true if within commit critical section */
  
  	uint8		vacuumFlags;	/* vacuum-related flags, see above */
  
--- 91,99 ----
  	Oid			databaseId;		/* OID of database this backend is using */
  	Oid			roleId;			/* OID of role using this backend */
  
! 	/* Fields for HoldCheckpoint()/ResumeCheckpoint() */
! 	int32		chkptHoldCounter;
! 	bool		holdCheckpoint;
  
  	uint8		vacuumFlags;	/* vacuum-related flags, see above */
  
***************
*** 204,207 **** extern bool enable_standby_sig_alarm(TimestampTz now,
--- 206,212 ----
  extern bool disable_standby_sig_alarm(void);
  extern void handle_standby_sig_alarm(SIGNAL_ARGS);
  
+ extern void HoldCheckpoint(void);
+ extern void ResumeCheckpoint(void);
+ 
  #endif   /* PROC_H */
*** a/src/include/storage/procarray.h
--- b/src/include/storage/procarray.h
***************
*** 48,55 **** extern bool TransactionIdIsInProgress(TransactionId xid);
  extern bool TransactionIdIsActive(TransactionId xid);
  extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
  
! extern int	GetTransactionsInCommit(TransactionId **xids_p);
! extern bool HaveTransactionsInCommit(TransactionId *xids, int nxids);
  
  extern PGPROC *BackendPidGetProc(int pid);
  extern int	BackendXidGetPid(TransactionId xid);
--- 48,54 ----
  extern bool TransactionIdIsActive(TransactionId xid);
  extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
  
! extern void ProcArrayCheckpointBarrier(void);
  
  extern PGPROC *BackendPidGetProc(int pid);
  extern int	BackendXidGetPid(TransactionId xid);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to