I have a newer version of my Synchronized Scanning patch which hopefully
makes it closer to a real patch, the first one was more of a proof of
concept. 

DONE:
* I added a proper bounds check for the result it gets from shared
memory.
* I expanded the shared memory to be a static hash table (currently set
to a size of 8KB, but that is a one line change if that's too big). Now
it can keep track of many scans on many tables at once.

TODO:
* More testing. I plan to get some more tests up on Monday, and when I
start to see the best uses for this patch, I will also try to benchmark
against MySQL or something else. I'm seeing some good preliminary
results in cache hit rate (which is much higher in some cases), but I
need to demonstrate an actual decrease in runtime.
* Right now the shared mem isn't destroyed when the postmaster shuts
down.
* This patch still makes no use of locks at all. If someone thinks locks
are required, please let me know. Currently, if there is inconsistent
data in the shared memory area the worst that can happen is no worse
than the current behavior.

Regards,
        Jeff Davis
--- postgresql-8.0.0/src/backend/access/heap/heapam.c	2004-12-31 13:59:16.000000000 -0800
+++ postgresql-8.0.0-ss/src/backend/access/heap/heapam.c	2005-02-25 23:49:15.049187562 -0800
@@ -65,6 +65,89 @@
  * ----------------------------------------------------------------
  */
 
+
+
+/* ss_get_startloc
+ *   returns the current location of an
+ *   in-progress scan. 
+ *   WARNING: this function is subject to a shared
+ *            memory race which could result in
+ *            returning a page location which doesn't
+ *            exist. In addition, another backend may
+ *            see a different number of relation pages and
+ *            may report a page which does not exist
+ *            in this backend. Therefore, the caller
+ *            should ALWAYS bounds-check this result
+ *            and have a default start page if this
+ *            function's return value is out of bounds.
+ */
+static BlockNumber ss_get_startloc(Oid relid) {
+  int shmid;
+  char *shm;
+  ss_scan_loc_t scanloc;
+  int offset;
+
+  offset = ss_hash_relid(relid)*sizeof(ss_scan_loc_t);
+  if((shmid = shmget(SS_SHM_KEY,
+		     SS_HASH_TABLE_SIZE*(sizeof(ss_scan_loc_t)),
+		     0666)) < 0)
+    return -1;
+  if((shm = shmat(shmid,NULL,SHM_RDONLY)) < 0)
+    return -1;
+
+  scanloc = *((ss_scan_loc_t*)(shm+offset));
+
+  if(shmdt(shm) < 0)
+    return -1;
+
+  return (scanloc.relid == relid) ? scanloc.loc : 0;
+}
+
+/* ss_report_loc
+ *   stores the relid and location in a static hash
+ *   table. Collisions simply overwrite.
+ *   WARNING: This function is subject to a shared
+ *            memory race. The data stored in this
+ *            table should not be relied upon! Rather,
+ *            it should be used as a suggested start
+ *            location for future scans and those scans
+ *            should start at a default location if this
+ *            data is out of bounds.
+ */
+static int ss_report_loc(Oid relid, BlockNumber loc) 
+{
+  int shmid;
+  char *shm;
+  ss_scan_loc_t scanloc;
+  int offset;
+
+  scanloc.relid = relid;
+  scanloc.loc = loc;
+  offset = ss_hash_relid(relid);
+  if((shmid = shmget(SS_SHM_KEY,
+		     SS_HASH_TABLE_SIZE*(sizeof(ss_scan_loc_t)),
+		     0666|IPC_CREAT)) < 0)
+    return -1;
+  if((shm = shmat(shmid,NULL,0)) < 0)
+    return -1;
+
+  *((ss_scan_loc_t*)(shm+offset)) = scanloc;
+
+  if(shmdt(shm) < 0)
+    return -1;
+
+  return 0;
+}
+
+/* ss_hash_relid
+ *   Simple hash function for use in ss_get_startloc()
+ *   and ss_report_loc()
+ */
+static int ss_hash_relid(Oid relid)
+{
+  return relid % SS_HASH_TABLE_SIZE;
+}
+
 /* ----------------
  *		initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -85,6 +168,22 @@
 	scan->rs_ctup.t_data = NULL;
 	scan->rs_cbuf = InvalidBuffer;
 
+	/* Initialize start page. 
+	 *  This sets the scan to start near where an
+	 *  already-in-progress scan is taking place to 
+	 *  make better use of shared buffers. If the
+	 *  it can't find a valid place to start, it starts
+	 *  at 0.
+	 *  This is called "Synchronized Scanning"
+	 */
+	scan->rs_start_page = ss_get_startloc(RelationGetRelid(scan->rs_rd));
+	/* Bounds checking for this return value is required
+	 * because the data in shared mem cannot be relied upon.
+	 */
+	if(scan->rs_start_page >= scan->rs_nblocks ||
+	   scan->rs_start_page <  0)
+	  scan->rs_start_page = 0;
+
 	/* we don't have a marked position... */
 	ItemPointerSetInvalid(&(scan->rs_mctid));
 
@@ -115,261 +214,283 @@
 		   Snapshot snapshot,
 		   int nkeys,
 		   ScanKey key,
-		   BlockNumber pages)
+		   BlockNumber pages,
+	   HeapScanDesc scan)
 {
-	ItemId		lpp;
-	Page		dp;
-	BlockNumber page;
-	int			lines;
-	OffsetNumber lineoff;
-	int			linesleft;
-	ItemPointer tid;
-
-	tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
-
-	/*
-	 * debugging stuff
-	 *
-	 * check validity of arguments, here and for other functions too Note: no
-	 * locking manipulations needed--this is a local function
-	 */
+  ItemId		lpp;
+  Page		dp;
+  BlockNumber page;
+  int			lines;
+  OffsetNumber lineoff;
+  int			linesleft;
+  ItemPointer tid;
+
+  tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
+
+  /*
+   * debugging stuff
+   *
+   * check validity of arguments, here and for other functions too Note: no
+   * locking manipulations needed--this is a local function
+   */
 #ifdef	HEAPDEBUGALL
-	if (ItemPointerIsValid(tid))
-		elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
-			 RelationGetRelationName(relation), tid, tid->ip_blkid,
-			 tid->ip_posid, dir);
-	else
-		elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
-			 RelationGetRelationName(relation), tid, dir);
-
-	elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
-
-	elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
-		 relation->rd_rel->relkind, RelationGetRelationName(relation),
-		 snapshot);
+  if (ItemPointerIsValid(tid))
+    elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
+	 RelationGetRelationName(relation), tid, tid->ip_blkid,
+	 tid->ip_posid, dir);
+  else
+    elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
+	 RelationGetRelationName(relation), tid, dir);
+
+  elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
+
+  elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
+       relation->rd_rel->relkind, RelationGetRelationName(relation),
+       snapshot);
 #endif   /* HEAPDEBUGALL */
 
-	if (!ItemPointerIsValid(tid))
-	{
-		Assert(!PointerIsValid(tid));
-		tid = NULL;
-	}
-
-	tuple->t_tableOid = relation->rd_id;
-
-	/*
-	 * return null immediately if relation is empty
-	 */
-	if (pages == 0)
-	{
-		if (BufferIsValid(*buffer))
-			ReleaseBuffer(*buffer);
-		*buffer = InvalidBuffer;
-		tuple->t_datamcxt = NULL;
-		tuple->t_data = NULL;
-		return;
-	}
-
-	/*
-	 * calculate next starting lineoff, given scan direction
-	 */
-	if (dir == 0)
-	{
-		/*
-		 * ``no movement'' scan direction: refetch same tuple
-		 */
-		if (tid == NULL)
-		{
-			if (BufferIsValid(*buffer))
-				ReleaseBuffer(*buffer);
-			*buffer = InvalidBuffer;
-			tuple->t_datamcxt = NULL;
-			tuple->t_data = NULL;
-			return;
-		}
-
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   ItemPointerGetBlockNumber(tid));
-
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-
-		dp = (Page) BufferGetPage(*buffer);
-		lineoff = ItemPointerGetOffsetNumber(tid);
-		lpp = PageGetItemId(dp, lineoff);
-
-		tuple->t_datamcxt = NULL;
-		tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
-		tuple->t_len = ItemIdGetLength(lpp);
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-
-		return;
-	}
-	else if (dir < 0)
-	{
-		/*
-		 * reverse scan direction
-		 */
-		if (tid == NULL)
-		{
-			page = pages - 1;	/* final page */
-		}
-		else
-		{
-			page = ItemPointerGetBlockNumber(tid);		/* current page */
-		}
-
-		Assert(page < pages);
-
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   page);
-
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-
-		dp = (Page) BufferGetPage(*buffer);
-		lines = PageGetMaxOffsetNumber(dp);
-		if (tid == NULL)
-		{
-			lineoff = lines;	/* final offnum */
-		}
-		else
-		{
-			lineoff =			/* previous offnum */
-				OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
-		}
-		/* page and lineoff now reference the physically previous tid */
-	}
+  if (!ItemPointerIsValid(tid))
+    {
+      Assert(!PointerIsValid(tid));
+      tid = NULL;
+    }
+
+  tuple->t_tableOid = relation->rd_id;
+
+  /*
+   * return null immediately if relation is empty
+   */
+  if (pages == 0)
+    {
+      if (BufferIsValid(*buffer))
+	ReleaseBuffer(*buffer);
+      *buffer = InvalidBuffer;
+      tuple->t_datamcxt = NULL;
+      tuple->t_data = NULL;
+      return;
+    }
+
+  /*
+   * calculate next starting lineoff, given scan direction
+   */
+  if (dir == 0)
+    {
+      /*
+       * ``no movement'' scan direction: refetch same tuple
+       */
+      if (tid == NULL)
+	{
+	  if (BufferIsValid(*buffer))
+	    ReleaseBuffer(*buffer);
+	  *buffer = InvalidBuffer;
+	  tuple->t_datamcxt = NULL;
+	  tuple->t_data = NULL;
+	  return;
+	}
+
+      *buffer = ReleaseAndReadBuffer(*buffer,
+				     relation,
+				     ItemPointerGetBlockNumber(tid));
+
+      LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+
+      dp = (Page) BufferGetPage(*buffer);
+      lineoff = ItemPointerGetOffsetNumber(tid);
+      lpp = PageGetItemId(dp, lineoff);
+
+      tuple->t_datamcxt = NULL;
+      tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+      tuple->t_len = ItemIdGetLength(lpp);
+      LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+
+      return;
+    }
+  else if (dir < 0)
+    {
+      /*
+       * reverse scan direction
+       */
+      if (tid == NULL)
+	{
+	  /*page = pages-1;*/	/* final page */
+	  page = scan->rs_start_page;
+	}
+      else
+	{
+	  page = ItemPointerGetBlockNumber(tid);  /* current page */
+	}
+
+      Assert(page < pages);
+
+      *buffer = ReleaseAndReadBuffer(*buffer,
+				     relation,
+				     page);
+
+      LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+
+      dp = (Page) BufferGetPage(*buffer);
+      lines = PageGetMaxOffsetNumber(dp);
+      if (tid == NULL)
+	{
+	  lineoff = lines;	/* final offnum */
+	}
+      else
+	{
+	  lineoff =			/* previous offnum */
+	    OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
+	}
+      /* page and lineoff now reference the physically previous tid */
+    }
+  else
+    {
+      /*
+       * forward scan direction
+       */
+      if (tid == NULL)
+	{
+	  page = scan->rs_start_page;     /* first page */
+	  lineoff = FirstOffsetNumber;		/* first offnum */
+	}
+      else
+	{
+	  page = ItemPointerGetBlockNumber(tid);	  /* current page */
+	  lineoff =			/* next offnum */
+	    OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
+	}
+
+      Assert(page < pages);
+
+      *buffer = ReleaseAndReadBuffer(*buffer,
+				     relation,
+				     page);
+
+      LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+
+      dp = (Page) BufferGetPage(*buffer);
+      lines = PageGetMaxOffsetNumber(dp);
+      /* page and lineoff now reference the physically next tid */
+    }
+
+  /* 'dir' is now non-zero */
+
+  /*
+   * calculate line pointer and number of remaining items to check on
+   * this page.
+   */
+  lpp = PageGetItemId(dp, lineoff);
+  if (dir < 0)
+    linesleft = lineoff - 1;
+  else
+    linesleft = lines - lineoff;
+
+  /*
+   * advance the scan until we find a qualifying tuple or run out of
+   * stuff to scan
+   */
+  for (;;)
+    {
+      while (linesleft >= 0)
+	{
+	  if (ItemIdIsUsed(lpp))
+	    {
+	      bool		valid;
+
+	      tuple->t_datamcxt = NULL;
+	      tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+	      tuple->t_len = ItemIdGetLength(lpp);
+	      ItemPointerSet(&(tuple->t_self), page, lineoff);
+
+	      /*
+	       * if current tuple qualifies, return it.
+	       */
+	      HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
+				 snapshot, nkeys, key, valid);
+	      if (valid)
+		{
+		  LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+		  return;
+		}
+	    }
+
+	  /*
+	   * otherwise move to the next item on the page
+	   */
+	  --linesleft;
+	  if (dir < 0)
+	    {
+	      --lpp;			/* move back in this page's ItemId array */
+	      --lineoff;
+	    }
+	  else
+	    {
+	      ++lpp;			/* move forward in this page's ItemId
+					 * array */
+	      ++lineoff;
+	    }
+	}
+
+      /*
+       * if we get here, it means we've exhausted the items on this page
+       * and it's time to move to the next.
+       */
+      LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+
+		
+      /* advance page, or wrap around */
+      if(dir < 0) {
+	if(page > 0)
+	  page = page - 1;
 	else
-	{
-		/*
-		 * forward scan direction
-		 */
-		if (tid == NULL)
-		{
-			page = 0;			/* first page */
-			lineoff = FirstOffsetNumber;		/* first offnum */
-		}
-		else
-		{
-			page = ItemPointerGetBlockNumber(tid);		/* current page */
-			lineoff =			/* next offnum */
-				OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
-		}
-
-		Assert(page < pages);
-
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   page);
-
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-
-		dp = (Page) BufferGetPage(*buffer);
-		lines = PageGetMaxOffsetNumber(dp);
-		/* page and lineoff now reference the physically next tid */
-	}
-
-	/* 'dir' is now non-zero */
-
-	/*
-	 * calculate line pointer and number of remaining items to check on
-	 * this page.
-	 */
-	lpp = PageGetItemId(dp, lineoff);
-	if (dir < 0)
-		linesleft = lineoff - 1;
+	  page = pages;
+      }
+      else {
+	if(page < pages-1)
+	  page = page + 1;
 	else
-		linesleft = lines - lineoff;
+	  page = 0;
+      }
 
-	/*
-	 * advance the scan until we find a qualifying tuple or run out of
-	 * stuff to scan
-	 */
-	for (;;)
+      /*
+       * return NULL if we've exhausted all the pages
+       */
+      if(page == scan->rs_start_page)
+	{
+	  if (BufferIsValid(*buffer))
+	    ReleaseBuffer(*buffer);
+	  *buffer = InvalidBuffer;
+	  tuple->t_datamcxt = NULL;
+	  tuple->t_data = NULL;
+	  return;
+	}
+
+      Assert(page < pages);
+
+      /*
+       * This reports the current page being accessed,
+       *  so that other backends can start scanning where
+       *  shared buffers are more likely to be hit.
+       *  This is called "Synchrinized Scanning".
+       */
+      ss_report_loc(RelationGetRelid(scan->rs_rd),page);
+      *buffer = ReleaseAndReadBuffer(*buffer,
+				     relation,
+				     page);
+
+      LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+      dp = (Page) BufferGetPage(*buffer);
+      lines = PageGetMaxOffsetNumber((Page) dp);
+      linesleft = lines - 1;
+      if (dir < 0)
+	{
+	  lineoff = lines;
+	  lpp = PageGetItemId(dp, lines);
+	}
+      else
 	{
-		while (linesleft >= 0)
-		{
-			if (ItemIdIsUsed(lpp))
-			{
-				bool		valid;
-
-				tuple->t_datamcxt = NULL;
-				tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
-				tuple->t_len = ItemIdGetLength(lpp);
-				ItemPointerSet(&(tuple->t_self), page, lineoff);
-
-				/*
-				 * if current tuple qualifies, return it.
-				 */
-				HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
-								   snapshot, nkeys, key, valid);
-				if (valid)
-				{
-					LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-					return;
-				}
-			}
-
-			/*
-			 * otherwise move to the next item on the page
-			 */
-			--linesleft;
-			if (dir < 0)
-			{
-				--lpp;			/* move back in this page's ItemId array */
-				--lineoff;
-			}
-			else
-			{
-				++lpp;			/* move forward in this page's ItemId
-								 * array */
-				++lineoff;
-			}
-		}
-
-		/*
-		 * if we get here, it means we've exhausted the items on this page
-		 * and it's time to move to the next.
-		 */
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-
-		/*
-		 * return NULL if we've exhausted all the pages
-		 */
-		if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
-		{
-			if (BufferIsValid(*buffer))
-				ReleaseBuffer(*buffer);
-			*buffer = InvalidBuffer;
-			tuple->t_datamcxt = NULL;
-			tuple->t_data = NULL;
-			return;
-		}
-
-		page = (dir < 0) ? (page - 1) : (page + 1);
-
-		Assert(page < pages);
-
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   page);
-
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-		dp = (Page) BufferGetPage(*buffer);
-		lines = PageGetMaxOffsetNumber((Page) dp);
-		linesleft = lines - 1;
-		if (dir < 0)
-		{
-			lineoff = lines;
-			lpp = PageGetItemId(dp, lines);
-		}
-		else
-		{
-			lineoff = FirstOffsetNumber;
-			lpp = PageGetItemId(dp, FirstOffsetNumber);
-		}
+	  lineoff = FirstOffsetNumber;
+	  lpp = PageGetItemId(dp, FirstOffsetNumber);
 	}
+    }
 }
 
 
@@ -829,6 +950,7 @@
 	/*
 	 * Note: we depend here on the -1/0/1 encoding of ScanDirection.
 	 */
+
 	heapgettup(scan->rs_rd,
 			   (int) direction,
 			   &(scan->rs_ctup),
@@ -836,7 +958,7 @@
 			   scan->rs_snapshot,
 			   scan->rs_nkeys,
 			   scan->rs_key,
-			   scan->rs_nblocks);
+			   scan->rs_nblocks,scan);
 
 	if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
 	{
@@ -1989,7 +2111,7 @@
 				   scan->rs_snapshot,
 				   0,
 				   NULL,
-				   scan->rs_nblocks);
+				   scan->rs_nblocks,scan);
 	}
 }
 
--- postgresql-8.0.0/src/include/access/heapam.h	2004-12-31 14:03:21.000000000 -0800
+++ postgresql-8.0.0-ss/src/include/access/heapam.h	2005-02-25 17:37:24.256359103 -0800
@@ -25,6 +25,28 @@
 #include "utils/rel.h"
 #include "utils/tqual.h"
 
+
+/* Synchronized Scanning:
+ *  These definitions and declarations are used
+ *  by the Synchronized Scanning support functions
+ *  in heapam.c
+ */
+
+#include <sys/shm.h>
+#define SS_HASH_TABLE_SIZE 1024
+#define SS_SHM_KEY 0x11aa55cc
+
+typedef struct {
+  Oid relid;
+  BlockNumber loc;
+} ss_scan_loc_t;
+
+static BlockNumber ss_get_startloc(Oid);
+static int         ss_report_loc(Oid,BlockNumber);
+static int         ss_hash_relid(Oid);
+
+/* end Synchronized Scanning section */
+
 /* ----------------
  *		fastgetattr
  *
--- postgresql-8.0.0/src/include/access/relscan.h	2004-12-31 14:03:21.000000000 -0800
+++ postgresql-8.0.0-ss/src/include/access/relscan.h	2005-02-25 17:37:54.398197789 -0800
@@ -34,6 +34,7 @@
 	ItemPointerData rs_mctid;	/* marked scan position, if any */
 
 	PgStat_Info rs_pgstat_info; /* statistics collector hook */
+        BlockNumber rs_start_page; /* page where this scan started */
 } HeapScanDescData;
 
 typedef HeapScanDescData *HeapScanDesc;
---------------------------(end of broadcast)---------------------------
TIP 3: if posting/reading through Usenet, please send an appropriate
      subscribe-nomail command to [EMAIL PROTECTED] so that your
      message can get through to the mailing list cleanly

Reply via email to