-- 
Álvaro Herrera                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/access/brin/brin_pageops.c b/src/backend/access/brin/brin_pageops.c
index 0b257d9..909e4c6 100644
--- a/src/backend/access/brin/brin_pageops.c
+++ b/src/backend/access/brin/brin_pageops.c
@@ -24,8 +24,9 @@
 
 
 static Buffer brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
-					 bool *was_extended);
+					 bool *extended);
 static Size br_page_get_freespace(Page page);
+static void initialize_empty_new_buffer(Relation idxrel, Buffer buffer);
 
 
 /*
@@ -53,7 +54,7 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 	BrinTuple  *oldtup;
 	Size		oldsz;
 	Buffer		newbuf;
-	bool		extended = false;
+	bool		extended;
 
 	Assert(newsz == MAXALIGN(newsz));
 
@@ -64,11 +65,11 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 	{
 		/* need a page on which to put the item */
 		newbuf = brin_getinsertbuffer(idxrel, oldbuf, newsz, &extended);
-		/* XXX delay vacuuming FSM until locks are released? */
-		if (extended)
-			FreeSpaceMapVacuum(idxrel);
 		if (!BufferIsValid(newbuf))
+		{
+			Assert(!extended);
 			return false;
+		}
 
 		/*
 		 * Note: it's possible (though unlikely) that the returned newbuf is
@@ -76,7 +77,10 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 		 * buffer does in fact have enough space.
 		 */
 		if (newbuf == oldbuf)
+		{
+			Assert(!extended);
 			newbuf = InvalidBuffer;
+		}
 	}
 	else
 	{
@@ -94,7 +98,13 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 	{
 		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
 		if (BufferIsValid(newbuf))
+		{
+			if (extended)
+				initialize_empty_new_buffer(idxrel, newbuf);
 			UnlockReleaseBuffer(newbuf);
+			if (extended)
+				FreeSpaceMapVacuum(idxrel);
+		}
 		return false;
 	}
 
@@ -106,9 +116,14 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 	 */
 	if (!brin_tuples_equal(oldtup, oldsz, origtup, origsz))
 	{
-		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
 		if (BufferIsValid(newbuf))
+		{
+			if (extended)
+				initialize_empty_new_buffer(idxrel, newbuf);
 			UnlockReleaseBuffer(newbuf);
+			if (extended)
+				FreeSpaceMapVacuum(idxrel);
+		}
 		return false;
 	}
 
@@ -125,7 +140,12 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 		brin_can_do_samepage_update(oldbuf, origsz, newsz))
 	{
 		if (BufferIsValid(newbuf))
+		{
+			/* as above */
+			if (extended)
+				initialize_empty_new_buffer(idxrel, newbuf);
 			UnlockReleaseBuffer(newbuf);
+		}
 
 		START_CRIT_SECTION();
 		PageIndexDeleteNoCompact(oldpage, &oldoff, 1);
@@ -157,6 +177,10 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 		END_CRIT_SECTION();
 
 		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
+
+		if (extended)
+			FreeSpaceMapVacuum(idxrel);
+
 		return true;
 	}
 	else if (newbuf == InvalidBuffer)
@@ -178,11 +202,21 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 		Buffer		revmapbuf;
 		ItemPointerData newtid;
 		OffsetNumber newoff;
+		BlockNumber	newblk = InvalidBlockNumber;
+		Size		freespace = 0;
 
 		revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
 
 		START_CRIT_SECTION();
 
+		/*
+		 * We need to initialize the page if it's newly obtained.  Note we will
+		 * WAL-log the initialization as part of the update, so we don't need
+		 * to do that here.
+		 */
+		if (extended)
+			brin_page_init(BufferGetPage(newbuf), BRIN_PAGETYPE_REGULAR);
+
 		PageIndexDeleteNoCompact(oldpage, &oldoff, 1);
 		newoff = PageAddItem(newpage, (Item) newtup, newsz,
 							 InvalidOffsetNumber, false, false);
@@ -191,6 +225,13 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 		MarkBufferDirty(oldbuf);
 		MarkBufferDirty(newbuf);
 
+		/* needed to update FSM below */
+		if (extended)
+		{
+			newblk = BufferGetBlockNumber(newbuf);
+			freespace = br_page_get_freespace(newpage);
+		}
+
 		ItemPointerSet(&newtid, BufferGetBlockNumber(newbuf), newoff);
 		brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, newtid);
 		MarkBufferDirty(revmapbuf);
@@ -235,6 +276,14 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 		LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
 		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
 		UnlockReleaseBuffer(newbuf);
+
+		if (extended)
+		{
+			Assert(BlockNumberIsValid(newblk));
+			RecordPageWithFreeSpace(idxrel, newblk, freespace);
+			FreeSpaceMapVacuum(idxrel);
+		}
+
 		return true;
 	}
 }
@@ -271,7 +320,7 @@ brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
 	OffsetNumber off;
 	Buffer		revmapbuf;
 	ItemPointerData tid;
-	bool		extended = false;
+	bool		extended;
 
 	Assert(itemsz == MAXALIGN(itemsz));
 
@@ -302,7 +351,7 @@ brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
 	{
 		*buffer = brin_getinsertbuffer(idxrel, InvalidBuffer, itemsz, &extended);
 		Assert(BufferIsValid(*buffer));
-		Assert(br_page_get_freespace(BufferGetPage(*buffer)) >= itemsz);
+		Assert(extended || br_page_get_freespace(BufferGetPage(*buffer)) >= itemsz);
 	}
 
 	/* Now obtain lock on revmap buffer */
@@ -312,6 +361,8 @@ brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
 	blk = BufferGetBlockNumber(*buffer);
 
 	START_CRIT_SECTION();
+	if (extended)
+		brin_page_init(BufferGetPage(*buffer), BRIN_PAGETYPE_REGULAR);
 	off = PageAddItem(page, (Item) tup, itemsz, InvalidOffsetNumber,
 					  false, false);
 	if (off == InvalidOffsetNumber)
@@ -494,22 +545,46 @@ brin_evacuate_page(Relation idxRel, BlockNumber pagesPerRange,
  * index item of size itemsz.  If oldbuf is a valid buffer, it is also locked
  * (in an order determined to avoid deadlocks.)
  *
- * If there's no existing page with enough free space to accommodate the new
- * item, the relation is extended.  If this happens, *extended is set to true.
- *
  * If we find that the old page is no longer a regular index page (because
  * of a revmap extension), the old buffer is unlocked and we return
  * InvalidBuffer.
+ *
+ * If there's no existing page with enough free space to accommodate the new
+ * item, the relation is extended.  If this happens, *extended is set to true,
+ * and it is the caller's responsibility to initialize the page (and WAL-log
+ * that fact) prior to use.
+ *
+ * Note that in some corner cases it is possible for this routine to extend the
+ * relation and then not return the buffer.  It is this routine's
+ * responsibility to WAL-log the page initialization and to record the page in
+ * FSM if that happens.  Such a buffer may later be reused by this routine.
  */
 static Buffer
 brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
-					 bool *was_extended)
+					 bool *extended)
 {
 	BlockNumber oldblk;
 	BlockNumber newblk;
 	Page		page;
 	int			freespace;
 
+	/*
+	 * If the item is oversized, don't bother.  We have another, more precise
+	 * check below.
+	 */
+	if (itemsz > BLCKSZ - sizeof(BrinSpecialSpace))
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
+						(unsigned long) itemsz,
+						(unsigned long) BLCKSZ - sizeof(BrinSpecialSpace),
+						RelationGetRelationName(irel))));
+		return InvalidBuffer;		/* keep compiler quiet */
+	}
+
+	*extended = false;
+
 	if (BufferIsValid(oldbuf))
 		oldblk = BufferGetBlockNumber(oldbuf);
 	else
@@ -528,7 +603,6 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
 	{
 		Buffer		buf;
 		bool		extensionLockHeld = false;
-		bool		extended = false;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -546,7 +620,7 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
 			}
 			buf = ReadBuffer(irel, P_NEW);
 			newblk = BufferGetBlockNumber(buf);
-			*was_extended = extended = true;
+			*extended = true;
 
 			BRIN_elog((DEBUG2, "brin_getinsertbuffer: extending to page %u",
 					   BufferGetBlockNumber(buf)));
@@ -576,6 +650,21 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
 			if (!BRIN_IS_REGULAR_PAGE(BufferGetPage(oldbuf)))
 			{
 				LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
+
+				/*
+				 * It is possible that the new page was obtained from extending
+				 * the relation.  In that case, we must be sure to record it in
+				 * the FSM before leaving, because otherwise the space would be
+				 * lost forever.  However, we cannot let an uninitialized page
+				 * get in the FSM, so we need to initialize it first.
+				 */
+				if (*extended)
+				{
+					initialize_empty_new_buffer(irel, buf);
+					/* shouldn't matter, but don't confuse caller */
+					*extended = false;
+				}
+
 				ReleaseBuffer(buf);
 				return InvalidBuffer;
 			}
@@ -588,9 +677,6 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
 
 		page = BufferGetPage(buf);
 
-		if (extended)
-			brin_page_init(page, BRIN_PAGETYPE_REGULAR);
-
 		/*
 		 * We have a new buffer to insert into.  Check that the new page has
 		 * enough free space, and return it if it does; otherwise start over.
@@ -600,7 +686,8 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
 		 * (br_page_get_freespace also checks that the FSM didn't hand us a
 		 * page that has since been repurposed for the revmap.)
 		 */
-		freespace = br_page_get_freespace(page);
+		freespace = *extended ?
+			BLCKSZ - sizeof(BrinSpecialSpace) : br_page_get_freespace(page);
 		if (freespace >= itemsz)
 		{
 			RelationSetTargetBlock(irel, BufferGetBlockNumber(buf));
@@ -610,7 +697,7 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
 			 * invalidations, make sure we update the more permanent FSM with
 			 * data about it before going away.
 			 */
-			if (extended)
+			if (*extended)
 				RecordPageWithFreeSpace(irel, BufferGetBlockNumber(buf),
 										freespace);
 
@@ -634,12 +721,13 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
 		/*
 		 * If an entirely new page does not contain enough free space for the
 		 * new item, then surely that item is oversized.  Complain loudly; but
-		 * first make sure we record the page as free, for next time.
+		 * first make sure we initialize the page and record it as free, for
+		 * next time.
 		 */
-		if (extended)
+		if (*extended)
 		{
-			RecordPageWithFreeSpace(irel, BufferGetBlockNumber(buf),
-									freespace);
+			initialize_empty_new_buffer(irel, buf);
+
 			ereport(ERROR,
 					(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 			errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
@@ -659,6 +747,35 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
 }
 
 /*
+ * Initialize a page as an empty regular BRIN page, WAL-log this, and record
+ * the page in FSM.
+ *
+ * There are several corner situations in which we extend the relation to
+ * obtain a new page and later find that we cannot use it immediately.  When
+ * that happens, we don't want to leave the page go unrecorded in FSM, because
+ * there is no mechanism to get the space back and the index would bloat.
+ * Also, because we would not WAL-log the action that would initialize the
+ * page, the page would go uninitialized in a standby (or after recovery).
+ */
+static void
+initialize_empty_new_buffer(Relation idxrel, Buffer buffer)
+{
+	Page	page;
+
+	START_CRIT_SECTION();
+	page = BufferGetPage(buffer);
+	brin_page_init(page, BRIN_PAGETYPE_REGULAR);
+	MarkBufferDirty(buffer);
+	/* FIXME use own WAL record, so that FSM update is replayed also? */
+	log_newpage_buffer(buffer, true);
+	END_CRIT_SECTION();
+
+	RecordPageWithFreeSpace(idxrel, BufferGetBlockNumber(buffer),
+							br_page_get_freespace(page));
+}
+
+
+/*
  * Return the amount of free space on a regular BRIN index page.
  *
  * If the page is not a regular page, or has been marked with the
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to