On 02.08.2011 20:06, Alvaro Herrera wrote:
Excerpts from Heikki Linnakangas's message of mar ago 02 11:59:24 -0400 2011:
On 02.08.2011 15:18, Simon Riggs wrote:
On Tue, Aug 2, 2011 at 12:43 PM, Heikki Linnakangas
<heikki.linnakan...@enterprisedb.com>   wrote:
On 02.08.2011 14:36, Simon Riggs wrote:
Actually I think we can append the new information to the end of the page
split record, so that an old version server can read WAL generated by new
version, too.

Not sure how that would work. Lengths, CRCs?

Or do you mean we will support 2 versions, have them both called the
same thing, just resolve which is which by the record length. Don't
like that.

Here's a patch to do what I meant. The new fields are stored at the very
end of the WAL record, and you check the length to see if they're there
or not. The nice thing about this is that it's compatible in both
directions.

Err, did you attach the wrong patch?

Yes, sorry about that. Here's the right patch.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 82ba726..71c145d 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -377,9 +377,18 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
 			state->ituplen++;
 		}
 
-		/* saves old rightlink */
+		/* save old rightlink and NSN */
 		if (state->stack->blkno != GIST_ROOT_BLKNO)
+		{
 			rrlink = GistPageGetOpaque(dist->page)->rightlink;
+			oldnsn = GistPageGetOpaque(dist->page)->nsn;
+		}
+		else
+		{
+			/* if root split we should put initial value */
+			rrlink = InvalidBlockNumber;
+			oldnsn = PageGetLSN(dist->page);
+		}
 
 		START_CRIT_SECTION();
 
@@ -407,7 +416,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
 			XLogRecData *rdata;
 
 			rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
-								   is_leaf, &(state->key), dist);
+								   is_leaf, &(state->key), dist,
+								   rrlink, &oldnsn);
 
 			recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
 
@@ -425,12 +435,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
 			}
 		}
 
-		/* set up NSN */
-		oldnsn = GistPageGetOpaque(dist->page)->nsn;
-		if (state->stack->blkno == GIST_ROOT_BLKNO)
-			/* if root split we should put initial value */
-			oldnsn = PageGetLSN(dist->page);
-
 		for (ptr = dist; ptr; ptr = ptr->next)
 		{
 			/* only for last set oldnsn */
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 7f5dd99..cdd8aaf 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -39,6 +39,8 @@ typedef struct
 {
 	gistxlogPageSplit *data;
 	NewPage    *page;
+	BlockNumber origrlink;
+	XLogRecPtr	orignsn;
 } PageSplitRecord;
 
 /* track for incomplete inserts, idea was taken from nbtxlog.c */
@@ -250,7 +252,6 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
 		 */
 		GistPageSetLeaf(page);
 
-	GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
 	PageSetLSN(page, lsn);
 	PageSetTLI(page, ThisTimeLineID);
 	MarkBufferDirty(buffer);
@@ -310,6 +311,26 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
 			j++;
 		}
 	}
+
+	/*
+	 * Starting with 9.0.5, the original NSN and rightlink on the split page
+	 * are stored here. It would've been more logical to add them to the
+	 * gistxlogPageSplit struct, but that would've broken compatibility with
+	 * the pre-9.0.5 WAL format.
+	 */
+	if (ptr - begin < record->xl_len)
+	{
+		memcpy(&decoded->origrlink, ptr, sizeof(BlockNumber));
+		ptr += sizeof(BlockNumber);
+		memcpy(&decoded->orignsn, ptr, sizeof(XLogRecPtr));
+	}
+	else
+	{
+		/* pre-9.0.5 format, no rightlink/NSN information */
+		decoded->origrlink = InvalidBlockNumber;
+		decoded->orignsn.xlogid = 0;
+		decoded->orignsn.xrecoff = 0;
+	}
 }
 
 static void
@@ -320,17 +341,32 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
 	Page		page;
 	int			i;
 	int			flags;
+	Buffer	   *buffers;
 
 	decodePageSplitRecord(&xlrec, record);
 	flags = xlrec.data->origleaf ? F_LEAF : 0;
 
-	/* loop around all pages */
+	/*
+	 * Lock all the pages involved in the split first, so that any concurrent
+	 * scans in hot standby mode will see the split as an atomic operation.
+	 */
+	buffers = palloc(xlrec.data->npage * sizeof(Buffer));
 	for (i = 0; i < xlrec.data->npage; i++)
 	{
 		NewPage    *newpage = xlrec.page + i;
 
-		buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
-		Assert(BufferIsValid(buffer));
+		buffers[i] = XLogReadBuffer(xlrec.data->node,
+									newpage->header->blkno,
+									true);
+		page = (Page) BufferGetPage(buffers[i]);
+	}
+
+	/* Write out all the pages */
+	for (i = 0; i < xlrec.data->npage; i++)
+	{
+		NewPage    *newpage = xlrec.page + i;
+
+		buffer = buffers[i];
 		page = (Page) BufferGetPage(buffer);
 
 		/* ok, clear buffer */
@@ -339,6 +375,18 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
 		/* and fill it */
 		gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
 
+		/* Set NSN and rightlink, needed for concurrent scans in hot standby */
+		if (i == xlrec.data->npage - 1)
+		{
+			GistPageGetOpaque(page)->nsn = xlrec.orignsn;
+			GistPageGetOpaque(page)->rightlink = xlrec.origrlink;
+		}
+		else
+		{
+			GistPageGetOpaque(page)->nsn = lsn;
+			GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
+		}
+
 		PageSetLSN(page, lsn);
 		PageSetTLI(page, ThisTimeLineID);
 		MarkBufferDirty(buffer);
@@ -350,6 +398,8 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
 	pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
 						 NULL, 0,
 						 &xlrec);
+
+	pfree(buffers);
 }
 
 static void
@@ -655,6 +705,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
 			XLogRecPtr	recptr;
 			Buffer		tempbuffer = InvalidBuffer;
 			int			ntodelete = 0;
+			BlockNumber	rrlink;
+			XLogRecPtr	oldnsn;
 
 			numbuffer = 1;
 			buffers[0] = ReadBuffer(index, insert->path[i]);
@@ -691,6 +743,10 @@ gistContinueInsert(gistIncompleteInsert *insert)
 			if (ntodelete == 0)
 				elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
 
+			/* Remember old rightlink and NSN */
+			rrlink = GistPageGetOpaque(pages[0])->rightlink;
+			oldnsn = GistPageGetOpaque(pages[0])->nsn;
+
 			/*
 			 * we check space with subtraction only first tuple to delete,
 			 * hope, that wiil be enough space....
@@ -742,7 +798,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
 				xlinfo = XLOG_GIST_PAGE_SPLIT;
 				rdata = formSplitRdata(index->rd_node, insert->path[i],
 									   false, &(insert->key),
-									 gistMakePageLayout(buffers, numbuffer));
+									   gistMakePageLayout(buffers, numbuffer),
+									   rrlink, &oldnsn);
 
 			}
 			else
@@ -849,7 +906,8 @@ gist_safe_restartpoint(void)
 
 XLogRecData *
 formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
-			   ItemPointer key, SplitedPageLayout *dist)
+			   ItemPointer key, SplitedPageLayout *dist,
+			   BlockNumber origrlink, XLogRecPtr *orignsn)
 {
 	XLogRecData *rdata;
 	gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
@@ -864,7 +922,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
 		ptr = ptr->next;
 	}
 
-	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
+	rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 4));
 
 	xlrec->node = node;
 	xlrec->origblkno = blkno;
@@ -893,11 +951,24 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
 		rdata[cur].data = (char *) (ptr->list);
 		rdata[cur].len = ptr->lenlist;
 		rdata[cur - 1].next = &(rdata[cur]);
-		rdata[cur].next = NULL;
 		cur++;
 		ptr = ptr->next;
 	}
 
+	/* Append origin rightlink and NSN */
+	rdata[cur].buffer = InvalidBuffer;
+	rdata[cur].data = (char *) &origrlink;
+	rdata[cur].len = sizeof(BlockNumber);
+	rdata[cur - 1].next = &(rdata[cur]);
+	cur++;
+
+	rdata[cur].buffer = InvalidBuffer;
+	rdata[cur].data = (char *) orignsn;
+	rdata[cur].len = sizeof(XLogRecPtr);
+	rdata[cur - 1].next = &(rdata[cur]);
+
+	rdata[cur].next = NULL;
+
 	return rdata;
 }
 
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 4df5fed..d4c8f04 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -260,7 +260,8 @@ extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
 
 extern XLogRecData *formSplitRdata(RelFileNode node,
 			   BlockNumber blkno, bool page_is_leaf,
-			   ItemPointer key, SplitedPageLayout *dist);
+			   ItemPointer key, SplitedPageLayout *dist,
+			   BlockNumber origrlink, XLogRecPtr *orignsn);
 
 extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to