Hi,

I don't know too much about gist, but did a quick read through. Mostly
spotting some stylistic issues. Please fix those making it easier for
the next reviewer.
Thank you for review! All mentioned issues are fixed.

--
Anastasia Lubennikova
Postgres Professional:http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 0e49959..f658831 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -36,6 +36,7 @@ static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
 				 bool unlockbuf, bool unlockleftchild);
 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
 				GISTSTATE *giststate, List *splitinfo, bool releasebuf);
+static void gistvacuumpage(Relation rel, Page page, Buffer buffer);
 
 
 #define ROTATEDIST(d) do { \
@@ -209,6 +210,17 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 	 * because the tuple vector passed to gistSplit won't include this tuple.
 	 */
 	is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
+
+	/*
+	 * If leaf page is full, try at first to delete dead tuples. And then
+	 * check again.
+	 */
+	if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
+	{
+		gistvacuumpage(rel, page, buffer);
+		is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
+	}
+
 	if (is_split)
 	{
 		/* no space for insertion */
@@ -1440,3 +1452,72 @@ freeGISTstate(GISTSTATE *giststate)
 	/* It's sufficient to delete the scanCxt */
 	MemoryContextDelete(giststate->scanCxt);
 }
+
+/*
+ * gistvacuumpage() -- try to remove LP_DEAD items from the given page.
+ */
+static void
+gistvacuumpage(Relation rel, Page page, Buffer buffer)
+{
+	OffsetNumber deletable[MaxOffsetNumber];
+	int			ndeletable = 0;
+	OffsetNumber offnum,
+				minoff,
+				maxoff;
+
+	/*
+	 * Scan over all items to see which ones need to be deleted according to
+	 * LP_DEAD flags.
+	 */
+	maxoff = PageGetMaxOffsetNumber(page);
+	for (offnum = FirstOffsetNumber;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		ItemId		itemId = PageGetItemId(page, offnum);
+
+		if (ItemIdIsDead(itemId))
+			deletable[ndeletable++] = offnum;
+	}
+
+	if (ndeletable > 0)
+	{
+		START_CRIT_SECTION();
+
+		PageIndexMultiDelete(page, deletable, ndeletable);
+
+		/*
+		 * Mark the page as not containing any LP_DEAD items.  This is not
+		 * certainly true (there might be some that have recently been marked,
+		 * but weren't included in our target-item list), but it will almost
+		 * always be true and it doesn't seem worth an additional page scan to
+		 * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
+		 */
+		GistClearPageHasGarbage(page);
+
+		MarkBufferDirty(buffer);
+
+		/* XLOG stuff */
+		if (RelationNeedsWAL(rel))
+		{
+			XLogRecPtr	recptr;
+
+			recptr = gistXLogUpdate(rel->rd_node, buffer,
+									deletable, ndeletable,
+									NULL, 0, InvalidBuffer);
+
+			PageSetLSN(page, recptr);
+		}
+		else
+			PageSetLSN(page, gistGetFakeLSN(rel));
+
+		END_CRIT_SECTION();
+	}
+
+	/*
+	 * Note: if we didn't find any LP_DEAD items, then the page's
+	 * F_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
+	 * separate write to clear it, however.  We will clear it when we split
+	 * the page.
+	 */
+}
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 20f695c..e655a05 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -24,6 +24,97 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
+/*
+ * gistkillitems() -- set LP_DEAD state for items an indexscan caller has
+ * told us were killed.
+ *
+ * We match items by heap TID before mark them. If an item has moved off
+ * the current page due to a split, we'll fail to find it and do nothing
+ * (this is not an error case --- we assume the item will eventually get
+ * marked in a future indexscan).
+ *
+ * We re-read page here, so it's important to check page LSN. If the page
+ * has been modified since the last read (as determined by LSN), we cannot
+ * flag any antries because it is possible that the old entry was vacuumed
+ * away and the TID was re-used by a completely different heap tuple.
+ */
+static void
+gistkillitems(IndexScanDesc scan)
+{
+	GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
+	Buffer		buffer;
+	Page		page;
+	OffsetNumber minoff;
+	OffsetNumber maxoff;
+	int			i;
+	bool		killedsomething = false;
+
+	Assert(so->curBlkno != InvalidBlockNumber);
+
+	buffer = ReadBuffer(scan->indexRelation, so->curBlkno);
+	if (!BufferIsValid(buffer))
+		return;
+
+	LockBuffer(buffer, GIST_SHARE);
+	gistcheckpage(scan->indexRelation, buffer);
+	page = BufferGetPage(buffer);
+
+	/*
+	 * If page LSN differs it means that the page was modified since the last read.
+	 * killedItemes could be not valid so LP_DEAD hints applying is not safe.
+	 */
+	if(PageGetLSN(page) != so->curPageLSN)
+	{
+		UnlockReleaseBuffer(buffer);
+		so->numKilled = 0; /* reset counter */
+		return;
+	}
+
+	minoff = FirstOffsetNumber;
+	maxoff = PageGetMaxOffsetNumber(page);
+
+	for (i = 0; i < so->numKilled; i++)
+	{
+		if (so->killedItems != NULL)
+		{
+			OffsetNumber offnum = FirstOffsetNumber;
+
+			while (offnum <= maxoff)
+			{
+				ItemId		iid = PageGetItemId(page, offnum);
+				IndexTuple	ituple = (IndexTuple) PageGetItem(page, iid);
+
+				/*
+				 * We match items by heap TID before mark them. We cope with cases
+				 * where items have moved right due to insertions. If founded item
+				 * is not equal to killed item, just skip it.
+				 */
+				if (ItemPointerEquals(&ituple->t_tid, &(so->killedItems[i])))
+				{
+					/* found the item */
+					ItemIdMarkDead(iid);
+					killedsomething = true;
+					break;		/* out of inner search loop */
+				}
+				offnum = OffsetNumberNext(offnum);
+			}
+		}
+	}
+
+	if (killedsomething)
+	{
+		GistMarkPageHasGarbage(page);
+		MarkBufferDirtyHint(buffer, true);
+	}
+
+	UnlockReleaseBuffer(buffer);
+
+	/*
+	 * Always reset the scan state, so we don't look for same items on other
+	 * pages.
+	 */
+	so->numKilled = 0;
+}
 
 /*
  * gistindex_keytest() -- does this index tuple satisfy the scan key(s)?
@@ -306,17 +397,33 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
 		MemoryContextReset(so->pageDataCxt);
 
 	/*
+	 * We save the LSN of the page as we read it, so that we know whether it
+	 * safe to apply LP_DEAD hints to the page later. This allows us to drop
+	 * the pin for MVCC scans, which allows vacuum to avoid blocking.
+	 */
+	so->curPageLSN = PageGetLSN(page);
+
+	/*
 	 * check all tuples on page
 	 */
 	maxoff = PageGetMaxOffsetNumber(page);
 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
 	{
-		IndexTuple	it = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+		ItemId      iid = PageGetItemId(page, i);
+		IndexTuple	it;
 		bool		match;
 		bool		recheck;
 		bool		recheck_distances;
 
 		/*
+		 * If the scan specifies not to return killed tuples, then we treat a
+		 * killed tuple as not passing the qual.
+		 */
+		if(scan->ignore_killed_tuples && ItemIdIsDead(iid))
+			continue;
+
+		it = (IndexTuple) PageGetItem(page, iid);
+		/*
 		 * Must call gistindex_keytest in tempCxt, and clean up any leftover
 		 * junk afterward.
 		 */
@@ -451,6 +557,27 @@ getNextNearest(IndexScanDesc scan)
 
 	if (scan->xs_itup)
 	{
+		/*
+		 * If previously returned index tuple is not visible save it into
+		 * so->killedItems. And at the end of the page scan mark all saved
+		 * tuples as dead.
+		 */
+		if (scan->kill_prior_tuple)
+		{
+			if (so->killedItems == NULL)
+			{
+				MemoryContext oldCxt =
+					MemoryContextSwitchTo(so->giststate->scanCxt);
+
+				so->killedItems =
+					(ItemPointerData *) palloc(MaxIndexTuplesPerPage
+						* sizeof(ItemPointerData));
+
+				MemoryContextSwitchTo(oldCxt);
+			}
+			if (so->numKilled < MaxIndexTuplesPerPage)
+				so->killedItems[so->numKilled++] = scan->xs_ctup.t_self;
+		}
 		/* free previously returned tuple */
 		pfree(scan->xs_itup);
 		scan->xs_itup = NULL;
@@ -515,6 +642,12 @@ getNextNearest(IndexScanDesc scan)
 		}
 		else
 		{
+			if (so->curBlkno != InvalidBlockNumber && so->numKilled > 0)
+				gistkillitems(scan);
+
+			/* save current item BlockNumber for next gistkillitems() call */
+			so->curBlkno = item->blkno;
+
 			/* visit an index page, extract its items into queue */
 			CHECK_FOR_INTERRUPTS();
 
@@ -572,7 +705,24 @@ gistgettuple(PG_FUNCTION_ARGS)
 		{
 			if (so->curPageData < so->nPageData)
 			{
+				if (scan->kill_prior_tuple && so->curPageData > 0)
+				{
 
+					if (so->killedItems == NULL)
+					{
+						MemoryContext oldCxt =
+							MemoryContextSwitchTo(so->giststate->scanCxt);
+
+						so->killedItems =
+							(ItemPointerData *) palloc(MaxIndexTuplesPerPage
+								* sizeof(ItemPointerData));
+
+						MemoryContextSwitchTo(oldCxt);
+					}
+					if (so->numKilled < MaxIndexTuplesPerPage)
+						so->killedItems[so->numKilled++] =
+							so->pageData[so->curPageData - 1].heapPtr;
+				}
 				/* continuing to return tuples from a leaf page */
 				scan->xs_ctup.t_self = so->pageData[so->curPageData].heapPtr;
 				scan->xs_recheck = so->pageData[so->curPageData].recheck;
@@ -586,9 +736,36 @@ gistgettuple(PG_FUNCTION_ARGS)
 				PG_RETURN_BOOL(true);
 			}
 
+			/*
+			 * Check the last returned tuple and add it to killitems if
+			 * necessary
+			 */
+			if (scan->kill_prior_tuple
+				&& so->curPageData > 0
+				&& so->curPageData == so->nPageData)
+			{
+
+				if (so->killedItems == NULL)
+				{
+					MemoryContext oldCxt =
+						MemoryContextSwitchTo(so->giststate->scanCxt);
+
+					so->killedItems =
+						(ItemPointerData *) palloc(MaxIndexTuplesPerPage
+							* sizeof(ItemPointerData));
+
+					MemoryContextSwitchTo(oldCxt);
+				}
+				if ((so->numKilled < MaxIndexTuplesPerPage))
+					so->killedItems[so->numKilled++] =
+						so->pageData[so->curPageData - 1].heapPtr;
+			}
 			/* find and process the next index page */
 			do
 			{
+				if (so->curBlkno != InvalidBlockNumber && so->numKilled > 0)
+					gistkillitems(scan);
+
 				GISTSearchItem *item = getNextGISTSearchItem(so);
 
 				if (!item)
@@ -596,6 +773,9 @@ gistgettuple(PG_FUNCTION_ARGS)
 
 				CHECK_FOR_INTERRUPTS();
 
+				/* save current item BlockNumber for next gistkillitems() call */
+				so->curBlkno = item->blkno;
+
 				/*
 				 * While scanning a leaf page, ItemPointers of matching heap
 				 * tuples are stored in so->pageData.  If there are any on
diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c
index ad39294..a17c5bc 100644
--- a/src/backend/access/gist/gistscan.c
+++ b/src/backend/access/gist/gistscan.c
@@ -93,6 +93,11 @@ gistbeginscan(PG_FUNCTION_ARGS)
 		memset(scan->xs_orderbynulls, true, sizeof(bool) * scan->numberOfOrderBys);
 	}
 
+	so->killedItems = NULL;		/* until needed */
+	so->numKilled = 0;
+	so->curBlkno = InvalidBlockNumber;
+	so->curPageLSN = InvalidXLogRecPtr;
+
 	scan->opaque = so;
 
 	/*
diff --git a/src/include/access/gist.h b/src/include/access/gist.h
index 81e559b..ea3a3b0 100644
--- a/src/include/access/gist.h
+++ b/src/include/access/gist.h
@@ -41,8 +41,11 @@
  */
 #define F_LEAF				(1 << 0)	/* leaf page */
 #define F_DELETED			(1 << 1)	/* the page has been deleted */
-#define F_TUPLES_DELETED	(1 << 2)	/* some tuples on the page are dead */
+#define F_TUPLES_DELETED	(1 << 2)	/* some tuples on the page were
+										 * deleted */
 #define F_FOLLOW_RIGHT		(1 << 3)	/* page to the right has no downlink */
+#define F_HAS_GARBAGE		(1 << 4)	/* some tuples on the page are dead,
+										 * but not deleted yet */
 
 typedef XLogRecPtr GistNSN;
 
@@ -137,6 +140,10 @@ typedef struct GISTENTRY
 #define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED)
 #define GistClearTuplesDeleted(page)	( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED)
 
+#define GistPageHasGarbage(page) ( GistPageGetOpaque(page)->flags & F_HAS_GARBAGE)
+#define GistMarkPageHasGarbage(page) ( GistPageGetOpaque(page)->flags |= F_HAS_GARBAGE)
+#define GistClearPageHasGarbage(page)	( GistPageGetOpaque(page)->flags &= ~F_HAS_GARBAGE)
+
 #define GistFollowRight(page) ( GistPageGetOpaque(page)->flags & F_FOLLOW_RIGHT)
 #define GistMarkFollowRight(page) ( GistPageGetOpaque(page)->flags |= F_FOLLOW_RIGHT)
 #define GistClearFollowRight(page)	( GistPageGetOpaque(page)->flags &= ~F_FOLLOW_RIGHT)
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 4f1a5c3..5f70b76 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -22,6 +22,7 @@
 #include "storage/bufmgr.h"
 #include "storage/buffile.h"
 #include "utils/hsearch.h"
+#include "access/genam.h"
 
 /*
  * Maximum number of "halves" a page can be split into in one operation.
@@ -161,6 +162,12 @@ typedef struct GISTScanOpaqueData
 	/* pre-allocated workspace arrays */
 	double	   *distances;		/* output area for gistindex_keytest */
 
+	/* info about killed items if any (killedItems is NULL if never used) */
+	ItemPointerData *killedItems;		/* heap pointers of killed items */
+	int			numKilled;		/* number of currently stored items */
+	BlockNumber curBlkno;		/* current number of block */
+	GistNSN		curPageLSN;	/* pos in the WAL stream when page was read */
+
 	/* In a non-ordered search, returnable heap items are stored here: */
 	GISTSearchHeapItem pageData[BLCKSZ / sizeof(IndexTupleData)];
 	OffsetNumber nPageData;		/* number of valid items in array */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to