From 8e1c614a645e2b688cf0166d119e903050bd5fe2 Mon Sep 17 00:00:00 2001
From: Kirk Jamison <k.jamison@jp.fujitsu.com>
Date: Wed, 25 Sep 2019 02:24:13 +0000
Subject: [PATCH] Optimize dropping of relation buffers using dlist

---
 src/backend/storage/buffer/Makefile     |   2 +-
 src/backend/storage/buffer/buf_init.c   |   6 +
 src/backend/storage/buffer/bufmgr.c     |  84 +++++----
 src/backend/storage/buffer/cached_buf.c | 324 ++++++++++++++++++++++++++++++++
 src/include/storage/buf_internals.h     | 163 ++++++++++++++++
 5 files changed, 539 insertions(+), 40 deletions(-)
 create mode 100644 src/backend/storage/buffer/cached_buf.c

diff --git a/src/backend/storage/buffer/Makefile b/src/backend/storage/buffer/Makefile
index 2c10fba..fa23c0c 100644
--- a/src/backend/storage/buffer/Makefile
+++ b/src/backend/storage/buffer/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/storage/buffer
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = buf_table.o buf_init.o bufmgr.o freelist.o localbuf.o
+OBJS = buf_table.o buf_init.o bufmgr.o freelist.o localbuf.o cached_buf.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index ccd2c31..3e4eb40 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -146,6 +146,9 @@ InitBufferPool(void)
 	/* Init other shared buffer-management stuff */
 	StrategyInitialize(!foundDescs);
 
+	/* Init cached buffer hash table and related shmem data structures */
+	InitCachedBufTable(NBuffers);
+
 	/* Initialize per-backend file flush context */
 	WritebackContextInit(&BackendWritebackContext,
 						 &backend_flush_after);
@@ -189,5 +192,8 @@ BufferShmemSize(void)
 	/* size of checkpoint sort array in bufmgr.c */
 	size = add_size(size, mul_size(NBuffers, sizeof(CkptSortItem)));
 
+	/* size of cached buffer shmem data structures */
+	size = add_size(size, CachedBufShmemSize());
+
 	return size;
 }
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 483f705..0ec6a78 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1268,6 +1268,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 
 			return buf;
 		}
+		/* Insert an entry into the cached block list */
+		CachedBufTableInsert(&newTag.rnode, buf->buf_id);
 
 		/*
 		 * Need to lock the buffer header too in order to change its tag.
@@ -1285,6 +1287,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 			break;
 
 		UnlockBufHdr(buf, buf_state);
+		CachedBufTableDelete(&newTag.rnode, buf->buf_id);
 		BufTableDelete(&newTag, newHash);
 		if (oldPartitionLock != NULL &&
 			oldPartitionLock != newPartitionLock)
@@ -1319,6 +1322,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 
 	if (oldPartitionLock != NULL)
 	{
+		CachedBufTableDelete(&oldTag.rnode, buf->buf_id);
 		BufTableDelete(&oldTag, oldHash);
 		if (oldPartitionLock != newPartitionLock)
 			LWLockRelease(oldPartitionLock);
@@ -1432,7 +1436,10 @@ retry:
 	 * Remove the buffer from the lookup hashtable, if it was in there.
 	 */
 	if (oldFlags & BM_TAG_VALID)
+	{
+		CachedBufTableDelete(&oldTag.rnode, buf->buf_id);
 		BufTableDelete(&oldTag, oldHash);
+	}
 
 	/*
 	 * Done with mapping lock.
@@ -2916,19 +2923,15 @@ BufferGetLSNAtomic(Buffer buffer)
  *		later.  It is also the responsibility of higher-level code to ensure
  *		that no other process could be trying to load more pages of the
  *		relation into buffers.
- *
- *		XXX currently it sequentially searches the buffer pool, should be
- *		changed to more clever ways of searching.  However, this routine
- *		is used only in code paths that aren't very performance-critical,
- *		and we shouldn't slow down the hot paths to make it faster ...
  * --------------------------------------------------------------------
  */
 void
 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
 					   int nforks, BlockNumber *firstDelBlock)
 {
-	int			i;
-	int			j;
+	int			i, j, nbufs;
+	int			buf_id_array[BUF_ID_ARRAY_SIZE];
+	int			forknum_indexes[BUF_ID_ARRAY_SIZE];
 
 	/* If it's a local relation, it's localbuf.c's problem. */
 	if (RelFileNodeBackendIsTemp(rnode))
@@ -2942,45 +2945,48 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
 		return;
 	}
 
-	for (i = 0; i < NBuffers; i++)
+	do
 	{
-		BufferDesc *bufHdr = GetBufferDescriptor(i);
-		uint32		buf_state;
+		nbufs = CachedBufLookup(rnode.node, forkNum, nforks,
+								forknum_indexes, firstDelBlock,
+								buf_id_array, lengthof(buf_id_array));
 
-		/*
-		 * We can make this a tad faster by prechecking the buffer tag before
-		 * we attempt to lock the buffer; this saves a lot of lock
-		 * acquisitions in typical cases.  It should be safe because the
-		 * caller must have AccessExclusiveLock on the relation, or some other
-		 * reason to be certain that no one is loading new pages of the rel
-		 * into the buffer pool.  (Otherwise we might well miss such pages
-		 * entirely.)  Therefore, while the tag might be changing while we
-		 * look at it, it can't be changing *to* a value we care about, only
-		 * *away* from such a value.  So false negatives are impossible, and
-		 * false positives are safe because we'll recheck after getting the
-		 * buffer lock.
-		 *
-		 * We could check forkNum and blockNum as well as the rnode, but the
-		 * incremental win from doing so seems small.
-		 */
-		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
-			continue;
+		for (i = 0; i < nbufs; i++)
+		{
+			BufferDesc *bufHdr = GetBufferDescriptor(buf_id_array[i]);
+			uint32	buf_state;
+			int		index = forknum_indexes[i];
 
-		buf_state = LockBufHdr(bufHdr);
+			/*
+			 * We can make this a tad faster by prechecking the buffer tag
+			 * before we attempt to lock the buffer; this saves a lot of
+			 * lock acquisitions in typical cases.  It should be safe
+			 * because the caller must have AccessExclusiveLock on the
+			 * relation, or some other reason to be certain that no one is
+			 * loading new pages of the rel into the buffer pool.
+			 * (Otherwise we might well miss such pages entirely.)
+			 * Therefore, while the tag might be changing while we look at
+			 * it, it can't be changing *to* a value we care about, only
+			 * *away* from such a value.  So false negatives are impossible,
+			 * and false positives are safe because we'll recheck after
+			 * getting the buffer lock.
+			 *
+			 * We could check forkNum and blockNum as well as the rnode, but
+			 * the incremental win from doing so seems small.
+			 */
+			if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+				continue;
+
+			buf_state = LockBufHdr(bufHdr);
 
-		for (j = 0; j < nforks; j++)
-		{
 			if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
-				bufHdr->tag.forkNum == forkNum[j] &&
-				bufHdr->tag.blockNum >= firstDelBlock[j])
-			{
+				bufHdr->tag.forkNum == forkNum[index] &&
+				bufHdr->tag.blockNum >= firstDelBlock[index])
 				InvalidateBuffer(bufHdr); /* releases spinlock */
-				break;
-			}
+			else
+				UnlockBufHdr(bufHdr, buf_state);
 		}
-		if (j >= nforks)
-			UnlockBufHdr(bufHdr, buf_state);
-	}
+	} while (nbufs == lengthof(buf_id_array));
 }
 
 /* ---------------------------------------------------------------------
diff --git a/src/backend/storage/buffer/cached_buf.c b/src/backend/storage/buffer/cached_buf.c
new file mode 100644
index 0000000..3a3b1fb
--- /dev/null
+++ b/src/backend/storage/buffer/cached_buf.c
@@ -0,0 +1,324 @@
+/*-------------------------------------------------------------------------
+ *
+ * cached_buf.c
+ *	  routines for mapping relations to the indexes of auxillary cached
+ *	  buffers.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/buffer/cached_buf.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "storage/bufmgr.h"
+#include "storage/buf_internals.h"
+
+
+/*
+ * Each relation and its buffer information are cached in a hash table
+ * located in shared memory. These cached buffers are chained together
+ * in a doubly-linked list.
+ */
+static HTAB		*CachedBufHash = NULL;
+
+BufDlistEnt	*BufDlistEntArray = NULL;
+static CachedBufTableLock *CachedBufLockArray = NULL;
+static uint32 CachedBufTableHashCode(RelFileNode *rnode);
+static LWLock *GetCachedBufPartitionLock(RelFileNode *rnode,
+										 uint32 *hashcodePtr);
+
+/*
+ * CachedBufShmemSize
+ * 		Estimate space needed for mapping cached buffer hash table
+ *
+ * 		size of lookup table is the desired hash table size
+ *		(possibly more than NBuffers)
+ */
+Size
+CachedBufShmemSize(void)
+{
+	Size		size = 0;
+
+	/* size of cached buffer lookup table */
+	size = add_size(size, hash_estimate_size(NBuffers,
+											sizeof(CachedBufEnt)));
+
+	/* size of cached buffer dlist entry array */
+	size = add_size(size, mul_size(NBuffers, sizeof(BufDlistEnt)));
+
+	/* size of locks */
+	size = add_size(size, mul_size(NBuffers,
+								   sizeof(CachedBufTableLock)));
+
+	return size;
+}
+
+/*
+ * InitCachedBLockTable
+ *      Initialize the cached block hash table and related data
+ *      structures at shared memory initialization.
+ */
+void
+InitCachedBufTable(int size)
+{
+	HASHCTL		info;
+	bool		foundList, foundLock;
+	int			i;
+
+	info.keysize = sizeof(RelFileNode);
+	info.entrysize = sizeof(CachedBufEnt);
+	info.num_partitions = NUM_MAP_PARTITIONS;
+
+	CachedBufHash = ShmemInitHash("Cached Block Lookup Table",
+								  size, size,
+								  &info,
+								  HASH_ELEM | HASH_BLOBS |
+								  HASH_PARTITION);
+
+	BufDlistEntArray = ShmemInitStruct("dlist entry array",
+									   size * sizeof(BufDlistEnt),
+									   &foundList);
+
+	CachedBufLockArray = (CachedBufTableLock *)
+			ShmemInitStruct("partition lock",
+							size * sizeof(CachedBufTableLock),
+							&foundLock);
+
+	if (!foundList && !foundLock)
+	{
+		CachedBufLockArray->cacheTrancheId = LWLockNewTrancheId();
+
+		for (i = 0; i < NUM_MAP_PARTITIONS; i++)
+			LWLockInitialize(&CachedBufLockArray->cacheLock[i],
+							 CachedBufLockArray->cacheTrancheId);
+	}
+	LWLockRegisterTranche(CachedBufLockArray->cacheTrancheId,
+						  "cached_block_tranche_id");
+}
+
+ /*
+  * CachedBufTableHashCode
+  *      Compute the hash code associated with target relation
+  *
+  * This must be passed to the insert/lookup/delete routines along with the
+  * relation. We do it like this because the callers need to know the hash
+  * code in order to determine which partition to lock, and we don't want
+  * to do the hash computation twice (hash_any is a bit slow).
+  */
+static uint32
+CachedBufTableHashCode(RelFileNode *rnode)
+{
+	return get_hash_value(CachedBufHash, (void *) rnode);
+}
+
+/*
+ * GetCachedBufPartitionLock
+ * 		Get lock instance for partition of cached buffer lookup table
+ */
+static LWLock *
+GetCachedBufPartitionLock(RelFileNode *rnode, uint32 *hashcode)
+{
+	*hashcode = CachedBufTableHashCode(rnode);
+
+	return &CachedBufLockArray->cacheLock[*hashcode % NUM_MAP_PARTITIONS];
+}
+
+/*
+ * CachedBufTableInsert
+ *		Insert a hash table entry for given relation and buffer ID
+ *
+ * If a hash entry for the target relation already exists, only buffer ID
+ * is inserted. Chain the buffer ID to the doubly-linked list.
+ *
+ * This function must hold exclusive lock for relation's partition.
+ */
+void
+CachedBufTableInsert(RelFileNode *rnode, int buf_id)
+{
+	uint32			hashcode;
+	LWLock			*map_lock;
+	CachedBufEnt	*hash_entry;
+	bool			found;
+
+	map_lock = GetCachedBufPartitionLock(rnode, &hashcode);
+	LWLockAcquire(map_lock, LW_EXCLUSIVE);
+
+	/* look up or create a hash table entry */
+	hash_entry = (CachedBufEnt *)
+		hash_search_with_hash_value(CachedBufHash,
+									(void *) rnode,
+									hashcode,
+									HASH_ENTER,
+									&found);
+
+	/* If not found, initialize linked list */
+	if (!found)
+		hash_entry->head = CACHED_BUF_END_OF_LIST;
+
+	Assert(buf_id >= 0);
+	cb_dlist_push_head(hash_entry, buf_id);
+
+	LWLockRelease(map_lock);
+}
+
+/*
+ * CachedBufLookup
+ *		Lookup the buffers of target relation in the cached buffer hash
+ *      table, insert the buffer IDs to the given array of buffer ID, and
+ *      return the number of buffers inserted in the array.
+ *
+ * This function must hold shared LWLock for relation's partition.
+ */
+int
+CachedBufLookup(RelFileNode rnode, ForkNumber *forkNum, int nforks,
+				  int *forknum_indexes, BlockNumber *firstDelBlock,
+				  int *buf_id_array, int size)
+{
+	uint32		hashcode;
+	LWLock		*map_lock;
+	CachedBufEnt	*hash_entry;
+	CachedBufEnt	temp_head_entry;
+	BufDlistEnt	*curr_entry = NULL;
+	int		i;
+	int		curr_buf_id;
+	int		new_curr_buf_id;
+	int		count = 0;
+	bool		target_buf = false;
+
+	map_lock = GetCachedBufPartitionLock(&rnode, &hashcode);
+	LWLockAcquire(map_lock, LW_SHARED);
+
+	hash_entry = (CachedBufEnt *)
+		hash_search_with_hash_value(CachedBufHash,
+									(void *) &rnode,
+									hashcode,
+									HASH_FIND,
+									NULL);
+
+	if (!hash_entry)
+	{
+		LWLockRelease(map_lock);
+		return 0;	/* No existing hash entry */
+	}
+
+	/* Initial temporary dlist */
+	temp_head_entry.head = CACHED_BUF_END_OF_LIST;
+
+	/* When traversing the main dlist, start from head */
+	curr_buf_id = hash_entry->head;
+
+	while(curr_buf_id != CACHED_BUF_END_OF_LIST && count < size)
+	{
+		BufferDesc *bufHdr = GetBufferDescriptor(curr_buf_id);
+		curr_entry = cb_dlist_entry(curr_buf_id);
+		new_curr_buf_id = curr_entry->next;
+
+		/* Check if it's our target buffer */
+		for (i = 0; i < nforks; i++)
+		{
+			if (bufHdr->tag.forkNum != forkNum[i])
+				continue;
+			else
+			{
+				if (bufHdr->tag.blockNum >= firstDelBlock[i])
+					target_buf = true;
+				break;
+			}
+		}
+
+		if (target_buf)
+		{
+			forknum_indexes[count] = i;
+			buf_id_array[count] = curr_buf_id;
+			++count;
+		}
+		else
+		{
+			/*
+			 * It's not the target buffer. Remove the current buffer ID
+			 * from the current list of target buffer IDs and store it
+			 * to a temporary list.
+			 */
+			cb_dlist_delete(hash_entry, curr_buf_id);
+			cb_dlist_push_head(&temp_head_entry, curr_buf_id);
+		}
+		/* Move current pointer to next */
+		curr_buf_id = new_curr_buf_id;
+	}
+
+	/* Check if main dlist is now empty */
+	if (cb_dlist_is_empty(hash_entry))
+	{
+		hash_entry->head = temp_head_entry.head;
+		temp_head_entry.head = CACHED_BUF_END_OF_LIST;
+	}
+
+	/* If we have a temporary dlist, append it to the main dlist */
+	if (!cb_dlist_is_empty(hash_entry) &&
+		!cb_dlist_is_empty(&temp_head_entry))
+		cb_dlist_combine(hash_entry, &temp_head_entry);
+
+	LWLockRelease(map_lock);
+
+	return count;
+}
+
+/*
+ * CachedBufTableDelete
+ *		Unlink the buffer ID from the doubly-linked list, then remove
+ *		the relation hash entry if its list is empty.
+ *
+ * This function must hold exclusive lock for relation's partition.
+ */
+void
+CachedBufTableDelete(RelFileNode *rnode, int buf_id)
+{
+	LWLock		*map_lock;
+	uint32		hashcode;
+	CachedBufEnt	*hash_entry;
+	bool		found;
+
+	Assert(buf_id >= 0);
+
+	map_lock = GetCachedBufPartitionLock(rnode, &hashcode);
+	LWLockAcquire(map_lock, LW_EXCLUSIVE);
+
+	/* look up hash table entry */
+	hash_entry = (CachedBufEnt *)
+		hash_search_with_hash_value(CachedBufHash,
+									(void *) rnode,
+									hashcode,
+									HASH_FIND,
+									&found);
+
+	if (!found)		/* rnode not found, nothing to do */
+	{
+		LWLockRelease(map_lock);
+		return;
+	}
+
+	cb_dlist_delete(hash_entry, buf_id);
+
+	/*
+	 * If there's no more cached elements for the target relation,
+	 * remove the relation hash entry.
+	 */
+	if (cb_dlist_is_empty(hash_entry))
+	{
+		hash_entry = (CachedBufEnt *)
+			hash_search_with_hash_value(CachedBufHash,
+										(void *) rnode,
+										hashcode,
+										HASH_REMOVE,
+										NULL);
+
+		if (!hash_entry)
+			elog(ERROR, "cached buffer hash table corrupted");
+	}
+
+	LWLockRelease(map_lock);
+}
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 6ffe184..1efb41a 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -297,6 +297,7 @@ typedef struct CkptSortItem
 
 extern CkptSortItem *CkptBufferIds;
 
+
 /*
  * Internal buffer management routines
  */
@@ -338,4 +339,166 @@ extern void DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
 extern void DropRelFileNodeAllLocalBuffers(RelFileNode rnode);
 extern void AtEOXact_LocalBuffers(bool isCommit);
 
+/* in cached_buf.c */
+
+/* entry to the cached buffer hash table */
+typedef struct CachedBufEnt
+{
+	RelFileNode		rnode;
+	int			head;	/* head index of list */
+} CachedBufEnt;
+
+/* entry to the doubly-linked list */
+typedef struct BufDlistEnt
+{
+	int		prev;
+	int		next;
+} BufDlistEnt;
+
+#define NUM_MAP_PARTITIONS	128
+#define BUF_ID_ARRAY_SIZE	100
+
+/* end of list entry for cached buffer list */
+#define CACHED_BUF_END_OF_LIST	(-1)
+
+extern BufDlistEnt	*BufDlistEntArray;
+
+/* lock for cached buffer hash table */
+typedef struct CachedBufTableLock
+{
+	LWLock	cacheLock[NUM_MAP_PARTITIONS];
+	int		cacheTrancheId;
+} CachedBufTableLock;
+
+extern Size CachedBufShmemSize(void);
+extern void InitCachedBufTable(int size);
+extern void CachedBufTableInsert(RelFileNode *rnode, int buf_id);
+extern int CachedBufLookup(RelFileNode rnode, ForkNumber *forkNum, int nforks,
+				  int *forknum_indexes, BlockNumber *firstDelBlock,
+				  int *buf_id_array, int size);
+extern void CachedBufTableDelete(RelFileNode *rnode, int buf_id);
+
+/*
+ * inline functions for the doubly-linked list of cached buffers
+ */
+/* Return the dlist entry */
+static inline BufDlistEnt *
+cb_dlist_entry(int buf_id)
+{
+	return BufDlistEntArray + buf_id;
+}
+
+/* Return the cached buffer id of the target entry */
+static inline int
+cb_dlist_buf_id(BufDlistEnt *entry)
+{
+	return entry - BufDlistEntArray;
+}
+
+/* Return the next entry */
+static inline BufDlistEnt *
+cb_dlist_next(BufDlistEnt *entry)
+{
+	return entry->next == CACHED_BUF_END_OF_LIST ?
+							NULL : cb_dlist_entry(entry->next);
+}
+
+/* Return the prev entry */
+static inline BufDlistEnt *
+cb_dlist_prev(BufDlistEnt *entry)
+{
+	return entry->prev == CACHED_BUF_END_OF_LIST ?
+							NULL : cb_dlist_entry(entry->prev);
+}
+
+/* Return if dlist is empty */
+static inline bool
+cb_dlist_is_empty(CachedBufEnt *hash_entry)
+{
+	return hash_entry->head == CACHED_BUF_END_OF_LIST;
+}
+
+/* Push to head of dlist */
+static inline void
+cb_dlist_push_head(CachedBufEnt *hash_entry, int buf_id)
+{
+	BufDlistEnt *new_entry = cb_dlist_entry(buf_id);
+
+	if (cb_dlist_is_empty(hash_entry))
+	{
+		new_entry->prev = buf_id; /* TAIL */
+		new_entry->next = hash_entry->head;
+	}
+	else
+	{
+		BufDlistEnt *head_entry = cb_dlist_entry(hash_entry->head);
+		int tail = head_entry->prev;
+		new_entry->prev = tail;
+		new_entry->next = hash_entry->head;
+		cb_dlist_next(new_entry)->prev = buf_id;
+	}
+	hash_entry->head = buf_id;
+}
+
+/* Remove the buffer ID from dlist */
+static inline void
+cb_dlist_delete(CachedBufEnt *hash_entry, int buf_id)
+{
+	BufDlistEnt	*curr_entry = cb_dlist_entry(buf_id);
+	BufDlistEnt	*head_entry = cb_dlist_entry(hash_entry->head);
+	BufDlistEnt	*next_entry = cb_dlist_next(curr_entry);
+	BufDlistEnt	*prev_entry = cb_dlist_prev(curr_entry);
+
+	int tail = head_entry->prev;
+	BufDlistEnt	*tail_entry = cb_dlist_entry(tail);
+
+	/* If entry to be deleted is the only entry */
+	if (head_entry == tail_entry)
+	{
+		head_entry->prev = CACHED_BUF_END_OF_LIST;
+		hash_entry->head = CACHED_BUF_END_OF_LIST;
+		return;
+	}
+
+	/*
+	 * If there is a next entry, update its prev field.
+	 * Otherwise, current entry is at tail (but not at head),
+	 * so update the new tail.
+	 */
+	if (next_entry != NULL)
+		next_entry->prev = curr_entry->prev;
+	else
+		head_entry->prev = tail;
+
+	/*
+	 * If the previous entry is not the tail entry, update its
+	 * next field. Otherwise, current entry is at head (but not
+	 * at tail). Update the new head entry and its tail pointer.
+	 */
+	if (prev_entry != tail_entry)
+		prev_entry->next = curr_entry->next;
+	else
+		hash_entry->head = curr_entry->next;
+}
+
+/* Append head of temporary dlist to main dlist */
+static inline void
+cb_dlist_combine(CachedBufEnt *main, CachedBufEnt *temp)
+{
+	BufDlistEnt	*main_head_entry = cb_dlist_entry(main->head);
+	BufDlistEnt  *temp_head_entry = cb_dlist_entry(temp->head);
+	int main_tail = main_head_entry->prev;
+	int temp_tail = temp_head_entry->prev;
+	BufDlistEnt	*main_tail_entry = cb_dlist_entry(main_tail);
+
+	/* Append the temporary dlist to main dlist */
+	main_tail_entry->next = temp->head;
+	temp_head_entry->prev = main_tail;
+	main_head_entry->prev = temp_tail;
+
+	/* Clear the head of temporary dlist */
+	temp_head_entry->prev = CACHED_BUF_END_OF_LIST;
+	temp->head = CACHED_BUF_END_OF_LIST;
+}
+
 #endif							/* BUFMGR_INTERNALS_H */
-- 
1.8.3.1

