diff --git a/contrib/pgstattuple/pgstattuple.c b/contrib/pgstattuple/pgstattuple.c
index c1122b4..d02539a 100644
--- a/contrib/pgstattuple/pgstattuple.c
+++ b/contrib/pgstattuple/pgstattuple.c
@@ -400,7 +400,6 @@ pgstat_hash_page(pgstattuple_type *stat, Relation rel, BlockNumber blkno,
 	Buffer		buf;
 	Page		page;
 
-	_hash_getlock(rel, blkno, HASH_SHARE);
 	buf = _hash_getbuf_with_strategy(rel, blkno, HASH_READ, 0, bstrategy);
 	page = BufferGetPage(buf);
 
@@ -431,7 +430,6 @@ pgstat_hash_page(pgstattuple_type *stat, Relation rel, BlockNumber blkno,
 	}
 
 	_hash_relbuf(rel, buf);
-	_hash_droplock(rel, blkno, HASH_SHARE);
 }
 
 /*
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 49a6c81..861dbc8 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -407,12 +407,15 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
 
 	so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
 	so->hashso_bucket_valid = false;
-	so->hashso_bucket_blkno = 0;
 	so->hashso_curbuf = InvalidBuffer;
+	so->hashso_bucket_buf = InvalidBuffer;
+	so->hashso_old_bucket_buf = InvalidBuffer;
 	/* set position invalid (this will cause _hash_first call) */
 	ItemPointerSetInvalid(&(so->hashso_curpos));
 	ItemPointerSetInvalid(&(so->hashso_heappos));
 
+	so->hashso_skip_moved_tuples = false;
+
 	scan->opaque = so;
 
 	/* register scan in case we change pages it's using */
@@ -436,10 +439,15 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 		_hash_dropbuf(rel, so->hashso_curbuf);
 	so->hashso_curbuf = InvalidBuffer;
 
-	/* release lock on bucket, too */
-	if (so->hashso_bucket_blkno)
-		_hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
-	so->hashso_bucket_blkno = 0;
+	/* release pin we hold on old primary bucket */
+	if (BufferIsValid(so->hashso_old_bucket_buf))
+		_hash_dropbuf(rel, so->hashso_old_bucket_buf);
+	so->hashso_old_bucket_buf = InvalidBuffer;
+
+	/* release pin we hold on primary bucket */
+	if (BufferIsValid(so->hashso_bucket_buf))
+		_hash_dropbuf(rel, so->hashso_bucket_buf);
+	so->hashso_bucket_buf = InvalidBuffer;
 
 	/* set position invalid (this will cause _hash_first call) */
 	ItemPointerSetInvalid(&(so->hashso_curpos));
@@ -453,6 +461,8 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 				scan->numberOfKeys * sizeof(ScanKeyData));
 		so->hashso_bucket_valid = false;
 	}
+
+	so->hashso_skip_moved_tuples = false;
 }
 
 /*
@@ -472,10 +482,15 @@ hashendscan(IndexScanDesc scan)
 		_hash_dropbuf(rel, so->hashso_curbuf);
 	so->hashso_curbuf = InvalidBuffer;
 
-	/* release lock on bucket, too */
-	if (so->hashso_bucket_blkno)
-		_hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
-	so->hashso_bucket_blkno = 0;
+	/* release pin we hold on old primary bucket */
+	if (BufferIsValid(so->hashso_old_bucket_buf))
+		_hash_dropbuf(rel, so->hashso_old_bucket_buf);
+	so->hashso_old_bucket_buf = InvalidBuffer;
+
+	/* release pin we hold on primary bucket */
+	if (BufferIsValid(so->hashso_bucket_buf))
+		_hash_dropbuf(rel, so->hashso_bucket_buf);
+	so->hashso_bucket_buf = InvalidBuffer;
 
 	pfree(so);
 	scan->opaque = NULL;
@@ -486,6 +501,9 @@ hashendscan(IndexScanDesc scan)
  * The set of target tuples is specified via a callback routine that tells
  * whether any given heap tuple (identified by ItemPointer) is being deleted.
  *
+ * This function also delete the tuples that are moved by split to other
+ * bucket.
+ *
  * Result: a palloc'd struct containing statistical info for VACUUM displays.
  */
 IndexBulkDeleteResult *
@@ -530,83 +548,60 @@ loop_top:
 	{
 		BlockNumber bucket_blkno;
 		BlockNumber blkno;
-		bool		bucket_dirty = false;
+		Buffer		bucket_buf;
+		Buffer		buf;
+		HashPageOpaque bucket_opaque;
+		Page		page;
+		bool		bucket_has_garbage = false;
 
 		/* Get address of bucket's start page */
 		bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
 
-		/* Exclusive-lock the bucket so we can shrink it */
-		_hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
-
 		/* Shouldn't have any active scans locally, either */
 		if (_hash_has_active_scan(rel, cur_bucket))
 			elog(ERROR, "hash index has active scan during VACUUM");
 
-		/* Scan each page in bucket */
 		blkno = bucket_blkno;
-		while (BlockNumberIsValid(blkno))
-		{
-			Buffer		buf;
-			Page		page;
-			HashPageOpaque opaque;
-			OffsetNumber offno;
-			OffsetNumber maxoffno;
-			OffsetNumber deletable[MaxOffsetNumber];
-			int			ndeletable = 0;
-
-			vacuum_delay_point();
 
-			buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
-										   LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
-											 info->strategy);
-			page = BufferGetPage(buf);
-			opaque = (HashPageOpaque) PageGetSpecialPointer(page);
-			Assert(opaque->hasho_bucket == cur_bucket);
-
-			/* Scan each tuple in page */
-			maxoffno = PageGetMaxOffsetNumber(page);
-			for (offno = FirstOffsetNumber;
-				 offno <= maxoffno;
-				 offno = OffsetNumberNext(offno))
-			{
-				IndexTuple	itup;
-				ItemPointer htup;
+		/*
+		 * Maintain a cleanup lock on primary bucket till we scan all the
+		 * pages in bucket.  This is required to ensure that we don't delete
+		 * tuples which are needed for concurrent scans on buckets where split
+		 * is in progress.  Retaining it till end of bucket scan ensures that
+		 * concurrent split can't be started on it.  In future, we might want
+		 * to relax the requirement for vacuum to take cleanup lock only for
+		 * buckets where split is in progress, however for squeeze phase we
+		 * need a cleanup lock, otherwise squeeze will move the tuples to a
+		 * different location and that can lead to change in order of results.
+		 */
+		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
+		LockBufferForCleanup(buf);
+		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
 
-				itup = (IndexTuple) PageGetItem(page,
-												PageGetItemId(page, offno));
-				htup = &(itup->t_tid);
-				if (callback(htup, callback_state))
-				{
-					/* mark the item for deletion */
-					deletable[ndeletable++] = offno;
-					tuples_removed += 1;
-				}
-				else
-					num_index_tuples += 1;
-			}
+		page = BufferGetPage(buf);
+		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 
-			/*
-			 * Apply deletions and write page if needed, advance to next page.
-			 */
-			blkno = opaque->hasho_nextblkno;
+		/*
+		 * If the bucket contains tuples that are moved by split, then we need
+		 * to delete such tuples on completion of split.  The cleanup lock on
+		 * bucket is not sufficient to detect whether a split is complete, as
+		 * the previous split could have been interrupted by cancel request or
+		 * error.
+		 */
+		if (H_HAS_GARBAGE(bucket_opaque) &&
+			!H_INCOMPLETE_SPLIT(bucket_opaque))
+			bucket_has_garbage = true;
 
-			if (ndeletable > 0)
-			{
-				PageIndexMultiDelete(page, deletable, ndeletable);
-				_hash_wrtbuf(rel, buf);
-				bucket_dirty = true;
-			}
-			else
-				_hash_relbuf(rel, buf);
-		}
+		bucket_buf = buf;
 
-		/* If we deleted anything, try to compact free space */
-		if (bucket_dirty)
-			_hash_squeezebucket(rel, cur_bucket, bucket_blkno,
-								info->strategy);
+		hashbucketcleanup(rel, bucket_buf, blkno, info->strategy,
+						  local_metapage.hashm_maxbucket,
+						  local_metapage.hashm_highmask,
+						  local_metapage.hashm_lowmask, &tuples_removed,
+						  &num_index_tuples, bucket_has_garbage, true,
+						  callback, callback_state);
 
-		/* Release bucket lock */
-		_hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
+		_hash_relbuf(rel, bucket_buf);
 
 		/* Advance to next bucket */
 		cur_bucket++;
@@ -687,6 +682,155 @@ hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	return stats;
 }
 
+/*
+ * Helper function to perform deletion of index entries from a bucket.
+ *
+ * This expects that the caller has acquired a cleanup lock on the target
+ * bucket (primary page of a bucket) and it is reponsibility of caller to
+ * release that lock.
+ */
+void
+hashbucketcleanup(Relation rel, Buffer bucket_buf,
+				  BlockNumber bucket_blkno,
+				  BufferAccessStrategy bstrategy,
+				  uint32 maxbucket,
+				  uint32 highmask, uint32 lowmask,
+				  double *tuples_removed,
+				  double *num_index_tuples,
+				  bool bucket_has_garbage,
+				  bool delay,
+				  IndexBulkDeleteCallback callback,
+				  void *callback_state)
+{
+	BlockNumber blkno;
+	Buffer		buf;
+	Bucket		cur_bucket;
+	Bucket new_bucket PG_USED_FOR_ASSERTS_ONLY;
+	Page		page;
+	bool		bucket_dirty = false;
+
+	blkno = bucket_blkno;
+	buf = bucket_buf;
+	page = BufferGetPage(buf);
+	cur_bucket = ((HashPageOpaque) PageGetSpecialPointer(page))->hasho_bucket;
+
+	if (bucket_has_garbage)
+		new_bucket = _hash_get_newbucket(rel, cur_bucket,
+										 lowmask, maxbucket);
+
+	/* Scan each page in bucket */
+	for (;;)
+	{
+		HashPageOpaque opaque;
+		OffsetNumber offno;
+		OffsetNumber maxoffno;
+		OffsetNumber deletable[MaxOffsetNumber];
+		int			ndeletable = 0;
+		bool		release_buf = false;
+
+		if (delay)
+			vacuum_delay_point();
+
+		page = BufferGetPage(buf);
+		opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+		/* Scan each tuple in page */
+		maxoffno = PageGetMaxOffsetNumber(page);
+		for (offno = FirstOffsetNumber;
+			 offno <= maxoffno;
+			 offno = OffsetNumberNext(offno))
+		{
+			IndexTuple	itup;
+			ItemPointer htup;
+			Bucket		bucket;
+
+			itup = (IndexTuple) PageGetItem(page,
+											PageGetItemId(page, offno));
+			htup = &(itup->t_tid);
+			if (callback && callback(htup, callback_state))
+			{
+				/* mark the item for deletion */
+				deletable[ndeletable++] = offno;
+				tuples_removed += 1;
+			}
+			else if (bucket_has_garbage)
+			{
+				/* delete the tuples that are moved by split. */
+				bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
+											  maxbucket,
+											  highmask,
+											  lowmask);
+				/* mark the item for deletion */
+				if (bucket != cur_bucket)
+				{
+					/*
+					 * We expect tuples to either belong to curent bucket or
+					 * new_bucket.  This is ensured because we don't allow
+					 * further splits from bucket that contains garbage. See
+					 * comments in _hash_expandtable.
+					 */
+					Assert(bucket == new_bucket);
+					deletable[ndeletable++] = offno;
+				}
+			}
+			else
+				num_index_tuples += 1;
+		}
+
+		/*
+		 * We don't release the lock on primary bucket till end of bucket
+		 * scan.
+		 */
+		if (blkno != bucket_blkno)
+			release_buf = true;
+
+		blkno = opaque->hasho_nextblkno;
+
+		/*
+		 * Apply deletions and write page if needed, advance to next page.
+		 */
+		if (ndeletable > 0)
+		{
+			PageIndexMultiDelete(page, deletable, ndeletable);
+			if (release_buf)
+				_hash_wrtbuf(rel, buf);
+			else
+				MarkBufferDirty(buf);
+			bucket_dirty = true;
+		}
+		else if (release_buf)
+			_hash_relbuf(rel, buf);
+
+		/* bail out if there are no more pages to scan. */
+		if (!BlockNumberIsValid(blkno))
+			break;
+
+		buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
+										 LH_OVERFLOW_PAGE,
+										 bstrategy);
+	}
+
+	/*
+	 * Clear the garbage flag from bucket after deleting the tuples that are
+	 * moved by split.  We purposefully clear the flag before squeeze bucket,
+	 * so that after restart, vacuum shouldn't again try to delete the moved
+	 * by split tuples.
+	 */
+	if (bucket_has_garbage)
+	{
+		HashPageOpaque bucket_opaque;
+
+		page = BufferGetPage(bucket_buf);
+		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+		bucket_opaque->hasho_flag &= ~LH_BUCKET_PAGE_HAS_GARBAGE;
+	}
+
+	/* If we deleted anything, try to compact free space */
+	if (bucket_dirty)
+		_hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
+							bstrategy);
+}
 
 void
 hash_redo(XLogReaderState *record)
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index acd2e64..e7a7b51 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -18,6 +18,8 @@
 #include "access/hash.h"
 #include "utils/rel.h"
 
+static void
+			_hash_finish_split(Relation rel, Buffer metabuf, Buffer obuf, Buffer nbuf);
 
 /*
  *	_hash_doinsert() -- Handle insertion of a single index tuple.
@@ -28,7 +30,8 @@
 void
 _hash_doinsert(Relation rel, IndexTuple itup)
 {
-	Buffer		buf;
+	Buffer		buf = InvalidBuffer;
+	Buffer		bucket_buf;
 	Buffer		metabuf;
 	HashMetaPage metap;
 	BlockNumber blkno;
@@ -70,51 +73,136 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 			errhint("Values larger than a buffer page cannot be indexed.")));
 
 	/*
-	 * Loop until we get a lock on the correct target bucket.
+	 * Conditionally get the lock on primary bucket page for insertion while
+	 * holding lock on meta page. If we have to wait, then release the meta
+	 * page lock and retry it in a hard way.
 	 */
-	for (;;)
-	{
-		/*
-		 * Compute the target bucket number, and convert to block number.
-		 */
-		bucket = _hash_hashkey2bucket(hashkey,
-									  metap->hashm_maxbucket,
-									  metap->hashm_highmask,
-									  metap->hashm_lowmask);
+	bucket = _hash_hashkey2bucket(hashkey,
+								  metap->hashm_maxbucket,
+								  metap->hashm_highmask,
+								  metap->hashm_lowmask);
 
-		blkno = BUCKET_TO_BLKNO(metap, bucket);
+	blkno = BUCKET_TO_BLKNO(metap, bucket);
 
-		/* Release metapage lock, but keep pin. */
+	/* Fetch the primary bucket page for the bucket */
+	buf = ReadBuffer(rel, blkno);
+	if (!ConditionalLockBuffer(buf))
+	{
+		_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+		LockBuffer(buf, HASH_WRITE);
+		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
+		_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
+		oldblkno = blkno;
+		retry = true;
+	}
+	else
+	{
+		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
 		_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+	}
 
+	if (retry)
+	{
 		/*
-		 * If the previous iteration of this loop locked what is still the
-		 * correct target bucket, we are done.  Otherwise, drop any old lock
-		 * and lock what now appears to be the correct bucket.
+		 * Loop until we get a lock on the correct target bucket.  We get the
+		 * lock on primary bucket page and retain the pin on it during insert
+		 * operation to prevent the concurrent splits.  Retaining pin on a
+		 * primary bucket page ensures that split can't happen as it needs to
+		 * acquire the cleanup lock on primary bucket page.  Acquiring lock on
+		 * primary bucket and rechecking if it is a target bucket is mandatory
+		 * as otherwise a concurrent split might cause this insertion to fall
+		 * in wrong bucket.
 		 */
-		if (retry)
+		for (;;)
 		{
-			if (oldblkno == blkno)
-				break;
-			_hash_droplock(rel, oldblkno, HASH_SHARE);
-		}
-		_hash_getlock(rel, blkno, HASH_SHARE);
+			/*
+			 * Compute the target bucket number, and convert to block number.
+			 */
+			bucket = _hash_hashkey2bucket(hashkey,
+										  metap->hashm_maxbucket,
+										  metap->hashm_highmask,
+										  metap->hashm_lowmask);
 
-		/*
-		 * Reacquire metapage lock and check that no bucket split has taken
-		 * place while we were awaiting the bucket lock.
-		 */
-		_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
-		oldblkno = blkno;
-		retry = true;
+			blkno = BUCKET_TO_BLKNO(metap, bucket);
+
+			/* Release metapage lock, but keep pin. */
+			_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+			/*
+			 * If the previous iteration of this loop locked what is still the
+			 * correct target bucket, we are done.  Otherwise, drop any old
+			 * lock and lock what now appears to be the correct bucket.
+			 */
+			if (retry)
+			{
+				if (oldblkno == blkno)
+					break;
+				_hash_relbuf(rel, buf);
+			}
+
+			/* Fetch the primary bucket page for the bucket */
+			buf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BUCKET_PAGE);
+
+			/*
+			 * Reacquire metapage lock and check that no bucket split has
+			 * taken place while we were awaiting the bucket lock.
+			 */
+			_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
+			oldblkno = blkno;
+			retry = true;
+		}
 	}
 
-	/* Fetch the primary bucket page for the bucket */
-	buf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BUCKET_PAGE);
+	/* remember the primary bucket buffer to release the pin on it at end. */
+	bucket_buf = buf;
+
 	page = BufferGetPage(buf);
 	pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
 	Assert(pageopaque->hasho_bucket == bucket);
 
+	/*
+	 * if there is any pending split, finish it before proceeding for the
+	 * insertion as insertion can cause a new split.  We don't want to allow
+	 * split from a bucket where there is a pending split as there is no
+	 * apparent benefit by doing so and it will make the code complicated to
+	 * finish the split that involves multiple buckets considering the case
+	 * where new split can also fail.
+	 */
+	if (H_NEW_INCOMPLETE_SPLIT(pageopaque))
+	{
+		BlockNumber oblkno;
+		Buffer		obuf;
+
+		oblkno = _hash_get_oldblk(rel, pageopaque);
+
+		/* Fetch the primary bucket page for the bucket */
+		obuf = _hash_getbuf(rel, oblkno, HASH_READ, LH_BUCKET_PAGE);
+
+		_hash_finish_split(rel, metabuf, obuf, buf);
+
+		/*
+		 * release the buffer here as the insertion will happen in new bucket.
+		 */
+		_hash_relbuf(rel, obuf);
+	}
+	else if (H_OLD_INCOMPLETE_SPLIT(pageopaque))
+	{
+		BlockNumber nblkno;
+		Buffer		nbuf;
+
+		nblkno = _hash_get_newblk(rel, pageopaque);
+
+		/* Fetch the primary bucket page for the bucket */
+		nbuf = _hash_getbuf(rel, nblkno, HASH_WRITE, LH_BUCKET_PAGE);
+
+		_hash_finish_split(rel, metabuf, buf, nbuf);
+
+		/*
+		 * release the buffer here as the insertion will happen in old bucket.
+		 */
+		_hash_relbuf(rel, nbuf);
+	}
+
 	/* Do the insertion */
 	while (PageGetFreeSpace(page) < itemsz)
 	{
@@ -127,14 +215,23 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 		{
 			/*
 			 * ovfl page exists; go get it.  if it doesn't have room, we'll
-			 * find out next pass through the loop test above.
+			 * find out next pass through the loop test above.  Retain the pin
+			 * if it is a primary bucket.
 			 */
-			_hash_relbuf(rel, buf);
+			if (pageopaque->hasho_flag & LH_BUCKET_PAGE)
+				_hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
+			else
+				_hash_relbuf(rel, buf);
 			buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
 			page = BufferGetPage(buf);
 		}
 		else
 		{
+			bool		retain_pin = false;
+
+			/* page flags must be accessed before releasing lock on a page. */
+			retain_pin = pageopaque->hasho_flag & LH_BUCKET_PAGE;
+
 			/*
 			 * we're at the end of the bucket chain and we haven't found a
 			 * page with enough room.  allocate a new overflow page.
@@ -144,7 +241,7 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 			_hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
 
 			/* chain to a new overflow page */
-			buf = _hash_addovflpage(rel, metabuf, buf);
+			buf = _hash_addovflpage(rel, metabuf, buf, retain_pin);
 			page = BufferGetPage(buf);
 
 			/* should fit now, given test above */
@@ -158,11 +255,13 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 	/* found page with enough space, so add the item here */
 	(void) _hash_pgaddtup(rel, buf, itemsz, itup);
 
-	/* write and release the modified page */
+	/*
+	 * write and release the modified page and ensure to release the pin on
+	 * primary page.
+	 */
 	_hash_wrtbuf(rel, buf);
-
-	/* We can drop the bucket lock now */
-	_hash_droplock(rel, blkno, HASH_SHARE);
+	if (buf != bucket_buf)
+		_hash_dropbuf(rel, bucket_buf);
 
 	/*
 	 * Write-lock the metapage so we can increment the tuple count. After
@@ -188,6 +287,127 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 }
 
 /*
+ *	_hash_finish_split() -- Finish the previously interrupted split operation
+ *
+ * To complete the split operation, we form the hash table of TIDs in new
+ * bucket which is then used by split operation to skip tuples that are
+ * already moved before the split operation was previously interruptted.
+ *
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * The buffer is returned in the same state.  (The metapage is only
+ * touched if it becomes necessary to add or remove overflow pages.)
+ *
+ * 'obuf' and 'nbuf' must be locked by the caller which is also responsible
+ * for unlocking it.
+ */
+static void
+_hash_finish_split(Relation rel, Buffer metabuf, Buffer obuf, Buffer nbuf)
+{
+	HASHCTL		hash_ctl;
+	HTAB	   *tidhtab;
+	Buffer		bucket_nbuf;
+	Page		opage;
+	Page		npage;
+	HashPageOpaque opageopaque;
+	HashPageOpaque npageopaque;
+	HashMetaPage metap;
+	Bucket		obucket;
+	Bucket		nbucket;
+	uint32		maxbucket;
+	uint32		highmask;
+	uint32		lowmask;
+	bool		found;
+
+	/* Initialize hash tables used to track TIDs */
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+	hash_ctl.keysize = sizeof(ItemPointerData);
+	hash_ctl.entrysize = sizeof(ItemPointerData);
+	hash_ctl.hcxt = CurrentMemoryContext;
+
+	tidhtab =
+		hash_create("bucket ctids",
+					256,		/* arbitrary initial size */
+					&hash_ctl,
+					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+	/*
+	 * Scan the new bucket and build hash table of TIDs
+	 */
+	bucket_nbuf = nbuf;
+	npage = BufferGetPage(nbuf);
+	npageopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+	for (;;)
+	{
+		BlockNumber nblkno;
+		OffsetNumber noffnum;
+		OffsetNumber nmaxoffnum;
+
+		/* Scan each tuple in new page */
+		nmaxoffnum = PageGetMaxOffsetNumber(npage);
+		for (noffnum = FirstOffsetNumber;
+			 noffnum <= nmaxoffnum;
+			 noffnum = OffsetNumberNext(noffnum))
+		{
+			IndexTuple	itup;
+
+			/* Fetch the item's TID and insert it in hash table. */
+			itup = (IndexTuple) PageGetItem(npage,
+											PageGetItemId(npage, noffnum));
+
+			(void) hash_search(tidhtab, &itup->t_tid, HASH_ENTER, &found);
+
+			Assert(!found);
+		}
+
+		nblkno = npageopaque->hasho_nextblkno;
+
+		/*
+		 * release our write lock without modifying buffer and ensure to
+		 * retain the pin on primary bucket.
+		 */
+		if (nbuf == bucket_nbuf)
+			_hash_chgbufaccess(rel, nbuf, HASH_READ, HASH_NOLOCK);
+		else
+			_hash_relbuf(rel, nbuf);
+
+		/* Exit loop if no more overflow pages in new bucket */
+		if (!BlockNumberIsValid(nblkno))
+			break;
+
+		/* Else, advance to next page */
+		nbuf = _hash_getbuf(rel, nblkno, HASH_READ, LH_OVERFLOW_PAGE);
+		npage = BufferGetPage(nbuf);
+		npageopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+	}
+
+	/* Get the metapage info */
+	_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
+	metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+	maxbucket = metap->hashm_maxbucket;
+	highmask = metap->hashm_highmask;
+	lowmask = metap->hashm_lowmask;
+
+	_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+	_hash_chgbufaccess(rel, bucket_nbuf, HASH_NOLOCK, HASH_WRITE);
+
+	npage = BufferGetPage(bucket_nbuf);
+	npageopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+	nbucket = npageopaque->hasho_bucket;
+
+	opage = BufferGetPage(obuf);
+	opageopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+	obucket = opageopaque->hasho_bucket;
+
+	_hash_splitbucket_guts(rel, metabuf, obucket,
+						   nbucket, obuf, bucket_nbuf, tidhtab,
+						   maxbucket, highmask, lowmask);
+
+	hash_destroy(tidhtab);
+}
+
+/*
  *	_hash_pgaddtup() -- add a tuple to a particular page in the index.
  *
  * This routine adds the tuple to the page as requested; it does not write out
diff --git a/src/backend/access/hash/hashovfl.c b/src/backend/access/hash/hashovfl.c
index db3e268..760563a 100644
--- a/src/backend/access/hash/hashovfl.c
+++ b/src/backend/access/hash/hashovfl.c
@@ -82,23 +82,20 @@ blkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
  *
  *	On entry, the caller must hold a pin but no lock on 'buf'.  The pin is
  *	dropped before exiting (we assume the caller is not interested in 'buf'
- *	anymore).  The returned overflow page will be pinned and write-locked;
- *	it is guaranteed to be empty.
+ *	anymore) if not asked to retain.  The pin will be retained only for the
+ *	primary bucket.  The returned overflow page will be pinned and
+ *	write-locked; it is guaranteed to be empty.
  *
  *	The caller must hold a pin, but no lock, on the metapage buffer.
  *	That buffer is returned in the same state.
  *
- *	The caller must hold at least share lock on the bucket, to ensure that
- *	no one else tries to compact the bucket meanwhile.  This guarantees that
- *	'buf' won't stop being part of the bucket while it's unlocked.
- *
  * NB: since this could be executed concurrently by multiple processes,
  * one should not assume that the returned overflow page will be the
  * immediate successor of the originally passed 'buf'.  Additional overflow
  * pages might have been added to the bucket chain in between.
  */
 Buffer
-_hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
+_hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin)
 {
 	Buffer		ovflbuf;
 	Page		page;
@@ -131,7 +128,10 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
 			break;
 
 		/* we assume we do not need to write the unmodified page */
-		_hash_relbuf(rel, buf);
+		if ((pageopaque->hasho_flag & LH_BUCKET_PAGE) && retain_pin)
+			_hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
+		else
+			_hash_relbuf(rel, buf);
 
 		buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
 	}
@@ -149,7 +149,10 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
 
 	/* logically chain overflow page to previous page */
 	pageopaque->hasho_nextblkno = BufferGetBlockNumber(ovflbuf);
-	_hash_wrtbuf(rel, buf);
+	if ((pageopaque->hasho_flag & LH_BUCKET_PAGE) && retain_pin)
+		_hash_chgbufaccess(rel, buf, HASH_WRITE, HASH_NOLOCK);
+	else
+		_hash_wrtbuf(rel, buf);
 
 	return ovflbuf;
 }
@@ -370,11 +373,11 @@ _hash_firstfreebit(uint32 map)
  *	in the bucket, or InvalidBlockNumber if no following page.
  *
  *	NB: caller must not hold lock on metapage, nor on either page that's
- *	adjacent in the bucket chain.  The caller had better hold exclusive lock
- *	on the bucket, too.
+ *	adjacent in the bucket chain except from primary bucket.  The caller had
+ *	better hold cleanup lock on the primary bucket.
  */
 BlockNumber
-_hash_freeovflpage(Relation rel, Buffer ovflbuf,
+_hash_freeovflpage(Relation rel, Buffer ovflbuf, BlockNumber bucket_blkno,
 				   BufferAccessStrategy bstrategy)
 {
 	HashMetaPage metap;
@@ -413,22 +416,41 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf,
 	/*
 	 * Fix up the bucket chain.  this is a doubly-linked list, so we must fix
 	 * up the bucket chain members behind and ahead of the overflow page being
-	 * deleted.  No concurrency issues since we hold exclusive lock on the
-	 * entire bucket.
+	 * deleted.  No concurrency issues since we hold the cleanup lock on
+	 * primary bucket.  We don't need to aqcuire buffer lock to fix the
+	 * primary bucket, as we already have that lock.
 	 */
 	if (BlockNumberIsValid(prevblkno))
 	{
-		Buffer		prevbuf = _hash_getbuf_with_strategy(rel,
-														 prevblkno,
-														 HASH_WRITE,
+		if (prevblkno == bucket_blkno)
+		{
+			Buffer		prevbuf = ReadBufferExtended(rel, MAIN_FORKNUM,
+													 prevblkno,
+													 RBM_NORMAL,
+													 bstrategy);
+
+			Page		prevpage = BufferGetPage(prevbuf);
+			HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
+
+			Assert(prevopaque->hasho_bucket == bucket);
+			prevopaque->hasho_nextblkno = nextblkno;
+			MarkBufferDirty(prevbuf);
+			ReleaseBuffer(prevbuf);
+		}
+		else
+		{
+			Buffer		prevbuf = _hash_getbuf_with_strategy(rel,
+															 prevblkno,
+															 HASH_WRITE,
 										   LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
-														 bstrategy);
-		Page		prevpage = BufferGetPage(prevbuf);
-		HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
+															 bstrategy);
+			Page		prevpage = BufferGetPage(prevbuf);
+			HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
 
-		Assert(prevopaque->hasho_bucket == bucket);
-		prevopaque->hasho_nextblkno = nextblkno;
-		_hash_wrtbuf(rel, prevbuf);
+			Assert(prevopaque->hasho_bucket == bucket);
+			prevopaque->hasho_nextblkno = nextblkno;
+			_hash_wrtbuf(rel, prevbuf);
+		}
 	}
 	if (BlockNumberIsValid(nextblkno))
 	{
@@ -570,7 +592,7 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno,
  *	required that to be true on entry as well, but it's a lot easier for
  *	callers to leave empty overflow pages and let this guy clean it up.
  *
- *	Caller must hold exclusive lock on the target bucket.  This allows
+ *	Caller must hold cleanup lock on the target bucket.  This allows
  *	us to safely lock multiple pages in the bucket.
  *
  *	Since this function is invoked in VACUUM, we provide an access strategy
@@ -580,6 +602,7 @@ void
 _hash_squeezebucket(Relation rel,
 					Bucket bucket,
 					BlockNumber bucket_blkno,
+					Buffer bucket_buf,
 					BufferAccessStrategy bstrategy)
 {
 	BlockNumber wblkno;
@@ -591,27 +614,22 @@ _hash_squeezebucket(Relation rel,
 	HashPageOpaque wopaque;
 	HashPageOpaque ropaque;
 	bool		wbuf_dirty;
+	bool		release_buf = false;
 
 	/*
 	 * start squeezing into the base bucket page.
 	 */
 	wblkno = bucket_blkno;
-	wbuf = _hash_getbuf_with_strategy(rel,
-									  wblkno,
-									  HASH_WRITE,
-									  LH_BUCKET_PAGE,
-									  bstrategy);
+	wbuf = bucket_buf;
 	wpage = BufferGetPage(wbuf);
 	wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
 
 	/*
-	 * if there aren't any overflow pages, there's nothing to squeeze.
+	 * if there aren't any overflow pages, there's nothing to squeeze. caller
+	 * is responsible to release the lock on primary bucket.
 	 */
 	if (!BlockNumberIsValid(wopaque->hasho_nextblkno))
-	{
-		_hash_relbuf(rel, wbuf);
 		return;
-	}
 
 	/*
 	 * Find the last page in the bucket chain by starting at the base bucket
@@ -669,12 +687,17 @@ _hash_squeezebucket(Relation rel,
 			{
 				Assert(!PageIsEmpty(wpage));
 
+				if (wblkno != bucket_blkno)
+					release_buf = true;
+
 				wblkno = wopaque->hasho_nextblkno;
 				Assert(BlockNumberIsValid(wblkno));
 
-				if (wbuf_dirty)
+				if (wbuf_dirty && release_buf)
 					_hash_wrtbuf(rel, wbuf);
-				else
+				else if (wbuf_dirty)
+					MarkBufferDirty(wbuf);
+				else if (release_buf)
 					_hash_relbuf(rel, wbuf);
 
 				/* nothing more to do if we reached the read page */
@@ -700,6 +723,7 @@ _hash_squeezebucket(Relation rel,
 				wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
 				Assert(wopaque->hasho_bucket == bucket);
 				wbuf_dirty = false;
+				release_buf = false;
 			}
 
 			/*
@@ -733,19 +757,25 @@ _hash_squeezebucket(Relation rel,
 		/* are we freeing the page adjacent to wbuf? */
 		if (rblkno == wblkno)
 		{
-			/* yes, so release wbuf lock first */
-			if (wbuf_dirty)
+			if (wblkno != bucket_blkno)
+				release_buf = true;
+
+			/* yes, so release wbuf lock first if needed */
+			if (wbuf_dirty && release_buf)
 				_hash_wrtbuf(rel, wbuf);
-			else
+			else if (wbuf_dirty)
+				MarkBufferDirty(wbuf);
+			else if (release_buf)
 				_hash_relbuf(rel, wbuf);
+
 			/* free this overflow page (releases rbuf) */
-			_hash_freeovflpage(rel, rbuf, bstrategy);
+			_hash_freeovflpage(rel, rbuf, bucket_blkno, bstrategy);
 			/* done */
 			return;
 		}
 
 		/* free this overflow page, then get the previous one */
-		_hash_freeovflpage(rel, rbuf, bstrategy);
+		_hash_freeovflpage(rel, rbuf, bucket_blkno, bstrategy);
 
 		rbuf = _hash_getbuf_with_strategy(rel,
 										  rblkno,
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index 178463f..83007ac 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -38,7 +38,7 @@ static bool _hash_alloc_buckets(Relation rel, BlockNumber firstblock,
 					uint32 nblocks);
 static void _hash_splitbucket(Relation rel, Buffer metabuf,
 				  Bucket obucket, Bucket nbucket,
-				  BlockNumber start_oblkno,
+				  Buffer obuf,
 				  Buffer nbuf,
 				  uint32 maxbucket,
 				  uint32 highmask, uint32 lowmask);
@@ -55,46 +55,6 @@ static void _hash_splitbucket(Relation rel, Buffer metabuf,
 
 
 /*
- * _hash_getlock() -- Acquire an lmgr lock.
- *
- * 'whichlock' should the block number of a bucket's primary bucket page to
- * acquire the per-bucket lock.  (See README for details of the use of these
- * locks.)
- *
- * 'access' must be HASH_SHARE or HASH_EXCLUSIVE.
- */
-void
-_hash_getlock(Relation rel, BlockNumber whichlock, int access)
-{
-	if (USELOCKING(rel))
-		LockPage(rel, whichlock, access);
-}
-
-/*
- * _hash_try_getlock() -- Acquire an lmgr lock, but only if it's free.
- *
- * Same as above except we return FALSE without blocking if lock isn't free.
- */
-bool
-_hash_try_getlock(Relation rel, BlockNumber whichlock, int access)
-{
-	if (USELOCKING(rel))
-		return ConditionalLockPage(rel, whichlock, access);
-	else
-		return true;
-}
-
-/*
- * _hash_droplock() -- Release an lmgr lock.
- */
-void
-_hash_droplock(Relation rel, BlockNumber whichlock, int access)
-{
-	if (USELOCKING(rel))
-		UnlockPage(rel, whichlock, access);
-}
-
-/*
  *	_hash_getbuf() -- Get a buffer by block number for read or write.
  *
  *		'access' must be HASH_READ, HASH_WRITE, or HASH_NOLOCK.
@@ -489,9 +449,11 @@ _hash_pageinit(Page page, Size size)
 /*
  * Attempt to expand the hash table by creating one new bucket.
  *
- * This will silently do nothing if it cannot get the needed locks.
+ * This will silently do nothing if there are active scans of our own
+ * backend or if we don't get cleanup lock on old bucket.
  *
- * The caller should hold no locks on the hash index.
+ * We do remove the tuples from old bucket, if there are any left over from
+ * previous split.
  *
  * The caller must hold a pin, but no lock, on the metapage buffer.
  * The buffer is returned in the same state.
@@ -506,10 +468,15 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	BlockNumber start_oblkno;
 	BlockNumber start_nblkno;
 	Buffer		buf_nblkno;
+	Buffer		buf_oblkno;
+	Page		opage;
+	HashPageOpaque oopaque;
 	uint32		maxbucket;
 	uint32		highmask;
 	uint32		lowmask;
 
+restart_expand:
+
 	/*
 	 * Write-lock the meta page.  It used to be necessary to acquire a
 	 * heavyweight lock to begin a split, but that is no longer required.
@@ -548,11 +515,15 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 		goto fail;
 
 	/*
-	 * Determine which bucket is to be split, and attempt to lock the old
-	 * bucket.  If we can't get the lock, give up.
+	 * Determine which bucket is to be split, and attempt to take cleanup lock
+	 * on the old bucket.  If we can't get the lock, give up.
 	 *
-	 * The lock protects us against other backends, but not against our own
-	 * backend.  Must check for active scans separately.
+	 * The cleanup lock protects us against other backends, but not against
+	 * our own backend.  Must check for active scans separately.
+	 *
+	 * The cleanup lock is mainly to protect the split from concurrent
+	 * inserts, however if there is any pending scan it will give up which is
+	 * not good, but harmless.
 	 */
 	new_bucket = metap->hashm_maxbucket + 1;
 
@@ -563,11 +534,50 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	if (_hash_has_active_scan(rel, old_bucket))
 		goto fail;
 
-	if (!_hash_try_getlock(rel, start_oblkno, HASH_EXCLUSIVE))
+	buf_oblkno = ReadBuffer(rel, start_oblkno);
+	if (!ConditionalLockBufferForCleanup(buf_oblkno))
+	{
+		ReleaseBuffer(buf_oblkno);
 		goto fail;
+	}
+	_hash_checkpage(rel, buf_oblkno, LH_BUCKET_PAGE);
+
+	opage = BufferGetPage(buf_oblkno);
+	oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+
+	/* we don't expect any pending split at this stage. */
+	Assert(!H_INCOMPLETE_SPLIT(oopaque));
+
+	/*
+	 * Clean the tuples remained from previous split.  This operation requires
+	 * cleanup lock and we already have one on old bucket, so let's do it. We
+	 * also don't want to allow further splits from the bucket till the
+	 * garbage of previous split is cleaned.  This has two advantages, first
+	 * it helps in avoiding the bloat due to garbage and second is, during
+	 * cleanup of bucket, we are always sure that the garbage tuples belong to
+	 * most recently splitted bucket.  On the contrary, if we allow cleanup of
+	 * bucket after meta page is updated to indicate the new split and before
+	 * the actual split, the cleanup operation won't be able to decide whether
+	 * the tuple has been moved to the newly created bucket and ended up
+	 * deleting such tuples.
+	 */
+	if (H_HAS_GARBAGE(oopaque))
+	{
+		/* Release the metapage lock. */
+		_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+		hashbucketcleanup(rel, buf_oblkno, start_oblkno, NULL,
+						  metap->hashm_maxbucket, metap->hashm_highmask,
+						  metap->hashm_lowmask, NULL,
+						  NULL, true, false, NULL, NULL);
+
+		_hash_relbuf(rel, buf_oblkno);
+
+		goto restart_expand;
+	}
 
 	/*
-	 * Likewise lock the new bucket (should never fail).
+	 * There shouldn't be any active scan on new bucket.
 	 *
 	 * Note: it is safe to compute the new bucket's blkno here, even though we
 	 * may still need to update the BUCKET_TO_BLKNO mapping.  This is because
@@ -579,9 +589,6 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	if (_hash_has_active_scan(rel, new_bucket))
 		elog(ERROR, "scan in progress on supposedly new bucket");
 
-	if (!_hash_try_getlock(rel, start_nblkno, HASH_EXCLUSIVE))
-		elog(ERROR, "could not get lock on supposedly new bucket");
-
 	/*
 	 * If the split point is increasing (hashm_maxbucket's log base 2
 	 * increases), we need to allocate a new batch of bucket pages.
@@ -600,8 +607,7 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 		if (!_hash_alloc_buckets(rel, start_nblkno, new_bucket))
 		{
 			/* can't split due to BlockNumber overflow */
-			_hash_droplock(rel, start_oblkno, HASH_EXCLUSIVE);
-			_hash_droplock(rel, start_nblkno, HASH_EXCLUSIVE);
+			_hash_relbuf(rel, buf_oblkno);
 			goto fail;
 		}
 	}
@@ -609,7 +615,8 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	/*
 	 * Physically allocate the new bucket's primary page.  We want to do this
 	 * before changing the metapage's mapping info, in case we can't get the
-	 * disk space.
+	 * disk space.  We don't need to take cleanup lock on new bucket as no
+	 * other backend could find this bucket unless meta page is updated.
 	 */
 	buf_nblkno = _hash_getnewbuf(rel, start_nblkno, MAIN_FORKNUM);
 
@@ -665,13 +672,9 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	/* Relocate records to the new bucket */
 	_hash_splitbucket(rel, metabuf,
 					  old_bucket, new_bucket,
-					  start_oblkno, buf_nblkno,
+					  buf_oblkno, buf_nblkno,
 					  maxbucket, highmask, lowmask);
 
-	/* Release bucket locks, allowing others to access them */
-	_hash_droplock(rel, start_oblkno, HASH_EXCLUSIVE);
-	_hash_droplock(rel, start_nblkno, HASH_EXCLUSIVE);
-
 	return;
 
 	/* Here if decide not to split or fail to acquire old bucket lock */
@@ -745,6 +748,10 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
  * The buffer is returned in the same state.  (The metapage is only
  * touched if it becomes necessary to add or remove overflow pages.)
  *
+ * Split needs to hold pin on primary bucket pages of both old and new
+ * buckets till end of operation.  This is to prevent vacuum to start
+ * when split is in progress.
+ *
  * In addition, the caller must have created the new bucket's base page,
  * which is passed in buffer nbuf, pinned and write-locked.  That lock and
  * pin are released here.  (The API is set up this way because we must do
@@ -756,37 +763,87 @@ _hash_splitbucket(Relation rel,
 				  Buffer metabuf,
 				  Bucket obucket,
 				  Bucket nbucket,
-				  BlockNumber start_oblkno,
+				  Buffer obuf,
 				  Buffer nbuf,
 				  uint32 maxbucket,
 				  uint32 highmask,
 				  uint32 lowmask)
 {
-	Buffer		obuf;
 	Page		opage;
 	Page		npage;
 	HashPageOpaque oopaque;
 	HashPageOpaque nopaque;
 
-	/*
-	 * It should be okay to simultaneously write-lock pages from each bucket,
-	 * since no one else can be trying to acquire buffer lock on pages of
-	 * either bucket.
-	 */
-	obuf = _hash_getbuf(rel, start_oblkno, HASH_WRITE, LH_BUCKET_PAGE);
 	opage = BufferGetPage(obuf);
 	oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
 
+	/*
+	 * Mark the old bucket to indicate that split is in progress and it has
+	 * deletable tuples. At operation end, we clear split in progress flag and
+	 * vacuum will clear page_has_garbage flag after deleting such tuples.
+	 */
+	oopaque->hasho_flag |= LH_BUCKET_PAGE_HAS_GARBAGE | LH_BUCKET_OLD_PAGE_SPLIT;
+
 	npage = BufferGetPage(nbuf);
 
-	/* initialize the new bucket's primary page */
+	/*
+	 * initialize the new bucket's primary page and mark it to indicate that
+	 * split is in progress.
+	 */
 	nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
 	nopaque->hasho_prevblkno = InvalidBlockNumber;
 	nopaque->hasho_nextblkno = InvalidBlockNumber;
 	nopaque->hasho_bucket = nbucket;
-	nopaque->hasho_flag = LH_BUCKET_PAGE;
+	nopaque->hasho_flag = LH_BUCKET_PAGE | LH_BUCKET_NEW_PAGE_SPLIT;
 	nopaque->hasho_page_id = HASHO_PAGE_ID;
 
+	_hash_splitbucket_guts(rel, metabuf, obucket,
+						   nbucket, obuf, nbuf, NULL,
+						   maxbucket, highmask, lowmask);
+
+	/* all done, now release the locks and pins on primary buckets. */
+	_hash_relbuf(rel, obuf);
+	_hash_relbuf(rel, nbuf);
+}
+
+/*
+ * _hash_splitbucket_guts -- Helper function to perform the split operation
+ *
+ * This routine is used to partition the tuples between old and new bucket and
+ * is used to finish the incomplete split operations.  To finish the previously
+ * interrupted split operation, caller needs to fill htab.  If htab is set, then
+ * we skip the movement of tuples that exists in htab, otherwise NULL value of
+ * htab indicates movement of all the tuples that belong to new bucket.
+ *
+ * Caller needs to lock and unlock the old and new primary buckets.
+ */
+void
+_hash_splitbucket_guts(Relation rel,
+					   Buffer metabuf,
+					   Bucket obucket,
+					   Bucket nbucket,
+					   Buffer obuf,
+					   Buffer nbuf,
+					   HTAB *htab,
+					   uint32 maxbucket,
+					   uint32 highmask,
+					   uint32 lowmask)
+{
+	Buffer		bucket_obuf;
+	Buffer		bucket_nbuf;
+	Page		opage;
+	Page		npage;
+	HashPageOpaque oopaque;
+	HashPageOpaque nopaque;
+
+	bucket_obuf = obuf;
+	opage = BufferGetPage(obuf);
+	oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+
+	bucket_nbuf = nbuf;
+	npage = BufferGetPage(nbuf);
+	nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+
 	/*
 	 * Partition the tuples in the old bucket between the old bucket and the
 	 * new bucket, advancing along the old bucket's overflow bucket chain and
@@ -798,8 +855,6 @@ _hash_splitbucket(Relation rel,
 		BlockNumber oblkno;
 		OffsetNumber ooffnum;
 		OffsetNumber omaxoffnum;
-		OffsetNumber deletable[MaxOffsetNumber];
-		int			ndeletable = 0;
 
 		/* Scan each tuple in old page */
 		omaxoffnum = PageGetMaxOffsetNumber(opage);
@@ -810,18 +865,45 @@ _hash_splitbucket(Relation rel,
 			IndexTuple	itup;
 			Size		itemsz;
 			Bucket		bucket;
+			bool		found = false;
 
 			/*
-			 * Fetch the item's hash key (conveniently stored in the item) and
-			 * determine which bucket it now belongs in.
+			 * Before inserting tuple, probe the hash table containing TIDs of
+			 * tuples belonging to new bucket, if we find a match, then skip
+			 * that tuple, else fetch the item's hash key (conveniently stored
+			 * in the item) and determine which bucket it now belongs in.
 			 */
 			itup = (IndexTuple) PageGetItem(opage,
 											PageGetItemId(opage, ooffnum));
+
+			if (htab)
+				(void) hash_search(htab, &itup->t_tid, HASH_FIND, &found);
+
+			if (found)
+				continue;
+
 			bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
 										  maxbucket, highmask, lowmask);
 
 			if (bucket == nbucket)
 			{
+				Size		itupsize = 0;
+				IndexTuple	new_itup;
+
+				/*
+				 * make a copy of index tuple as we have to scribble on it.
+				 */
+				new_itup = CopyIndexTuple(itup);
+
+				/*
+				 * mark the index tuple as moved by split, such tuples are
+				 * skipped by scan if there is split in progress for a bucket.
+				 */
+				itupsize = new_itup->t_info & INDEX_SIZE_MASK;
+				new_itup->t_info &= ~INDEX_SIZE_MASK;
+				new_itup->t_info |= INDEX_MOVED_BY_SPLIT_MASK;
+				new_itup->t_info |= itupsize;
+
 				/*
 				 * insert the tuple into the new bucket.  if it doesn't fit on
 				 * the current page in the new bucket, we must allocate a new
@@ -832,17 +914,25 @@ _hash_splitbucket(Relation rel,
 				 * only partially complete, meaning the index is corrupt,
 				 * since searches may fail to find entries they should find.
 				 */
-				itemsz = IndexTupleDSize(*itup);
+				itemsz = IndexTupleDSize(*new_itup);
 				itemsz = MAXALIGN(itemsz);
 
 				if (PageGetFreeSpace(npage) < itemsz)
 				{
+					bool		retain_pin = false;
+
+					/*
+					 * page flags must be accessed before releasing lock on a
+					 * page.
+					 */
+					retain_pin = nopaque->hasho_flag & LH_BUCKET_PAGE;
+
 					/* write out nbuf and drop lock, but keep pin */
 					_hash_chgbufaccess(rel, nbuf, HASH_WRITE, HASH_NOLOCK);
 					/* chain to a new overflow page */
-					nbuf = _hash_addovflpage(rel, metabuf, nbuf);
+					nbuf = _hash_addovflpage(rel, metabuf, nbuf, retain_pin);
 					npage = BufferGetPage(nbuf);
-					/* we don't need nopaque within the loop */
+					nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
 				}
 
 				/*
@@ -852,12 +942,10 @@ _hash_splitbucket(Relation rel,
 				 * Possible future improvement: accumulate all the items for
 				 * the new page and qsort them before insertion.
 				 */
-				(void) _hash_pgaddtup(rel, nbuf, itemsz, itup);
+				(void) _hash_pgaddtup(rel, nbuf, itemsz, new_itup);
 
-				/*
-				 * Mark tuple for deletion from old page.
-				 */
-				deletable[ndeletable++] = ooffnum;
+				/* be tidy */
+				pfree(new_itup);
 			}
 			else
 			{
@@ -870,15 +958,9 @@ _hash_splitbucket(Relation rel,
 
 		oblkno = oopaque->hasho_nextblkno;
 
-		/*
-		 * Done scanning this old page.  If we moved any tuples, delete them
-		 * from the old page.
-		 */
-		if (ndeletable > 0)
-		{
-			PageIndexMultiDelete(opage, deletable, ndeletable);
-			_hash_wrtbuf(rel, obuf);
-		}
+		/* retain the pin on the old primary bucket */
+		if (obuf == bucket_obuf)
+			_hash_chgbufaccess(rel, obuf, HASH_READ, HASH_NOLOCK);
 		else
 			_hash_relbuf(rel, obuf);
 
@@ -887,18 +969,42 @@ _hash_splitbucket(Relation rel,
 			break;
 
 		/* Else, advance to next old page */
-		obuf = _hash_getbuf(rel, oblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
+		obuf = _hash_getbuf(rel, oblkno, HASH_READ, LH_OVERFLOW_PAGE);
 		opage = BufferGetPage(obuf);
 		oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
 	}
 
 	/*
 	 * We're at the end of the old bucket chain, so we're done partitioning
-	 * the tuples.  Before quitting, call _hash_squeezebucket to ensure the
-	 * tuples remaining in the old bucket (including the overflow pages) are
-	 * packed as tightly as possible.  The new bucket is already tight.
+	 * the tuples.  Mark the old and new buckets to indicate split is
+	 * finished.
+	 */
+	if (!(nopaque->hasho_flag & LH_BUCKET_PAGE))
+		_hash_wrtbuf(rel, nbuf);
+
+	_hash_chgbufaccess(rel, bucket_obuf, HASH_NOLOCK, HASH_WRITE);
+	opage = BufferGetPage(bucket_obuf);
+	oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+
+	/*
+	 * need to acquire the write lock only if current bucket is not a primary
+	 * bucket, otherwise we already have a lock on it.
 	 */
-	_hash_wrtbuf(rel, nbuf);
+	if (!(nopaque->hasho_flag & LH_BUCKET_PAGE))
+	{
+		_hash_chgbufaccess(rel, bucket_nbuf, HASH_NOLOCK, HASH_WRITE);
+		npage = BufferGetPage(bucket_nbuf);
+		nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+	}
 
-	_hash_squeezebucket(rel, obucket, start_oblkno, NULL);
+	/* indicate that split is finished */
+	oopaque->hasho_flag &= ~LH_BUCKET_OLD_PAGE_SPLIT;
+	nopaque->hasho_flag &= ~LH_BUCKET_NEW_PAGE_SPLIT;
+
+	/*
+	 * now write the buffers, here we don't release the locks as caller is
+	 * responsible to release locks.
+	 */
+	MarkBufferDirty(bucket_obuf);
+	MarkBufferDirty(bucket_nbuf);
 }
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index 4825558..d87cf8b 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -72,7 +72,23 @@ _hash_readnext(Relation rel,
 	BlockNumber blkno;
 
 	blkno = (*opaquep)->hasho_nextblkno;
-	_hash_relbuf(rel, *bufp);
+
+	/*
+	 * Retain the pin on primary bucket page till the end of scan to ensure
+	 * that vacuum can't delete the tuples that are moved by split to new
+	 * bucket. Such tuples are required by the scans that are started on
+	 * splitted buckets, before a new buckets split in progress flag
+	 * (LH_BUCKET_NEW_PAGE_SPLIT) is cleared.  Now the requirement to retain a
+	 * pin on primary bucket can be relaxed for buckets that are not splitted
+	 * by checking has_garbage flag in bucket, but still we need to retain pin
+	 * for squeeze phase otherwise the movement of tuples could lead to change
+	 * the ordering of scan results, so let's keep it for all buckets.
+	 */
+	if ((*opaquep)->hasho_flag & LH_BUCKET_PAGE)
+		_hash_chgbufaccess(rel, *bufp, HASH_READ, HASH_NOLOCK);
+	else
+		_hash_relbuf(rel, *bufp);
+
 	*bufp = InvalidBuffer;
 	/* check for interrupts while we're not holding any buffer lock */
 	CHECK_FOR_INTERRUPTS();
@@ -94,7 +110,16 @@ _hash_readprev(Relation rel,
 	BlockNumber blkno;
 
 	blkno = (*opaquep)->hasho_prevblkno;
-	_hash_relbuf(rel, *bufp);
+
+	/*
+	 * Retain the pin on primary bucket page till the end of scan. See
+	 * comments in _hash_readnext to know the reason of retaining pin.
+	 */
+	if ((*opaquep)->hasho_flag & LH_BUCKET_PAGE)
+		_hash_chgbufaccess(rel, *bufp, HASH_READ, HASH_NOLOCK);
+	else
+		_hash_relbuf(rel, *bufp);
+
 	*bufp = InvalidBuffer;
 	/* check for interrupts while we're not holding any buffer lock */
 	CHECK_FOR_INTERRUPTS();
@@ -192,43 +217,85 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 	metap = HashPageGetMeta(page);
 
 	/*
-	 * Loop until we get a lock on the correct target bucket.
+	 * Conditionally get the lock on primary bucket page for search while
+	 * holding lock on meta page. If we have to wait, then release the meta
+	 * page lock and retry it in a hard way.
 	 */
-	for (;;)
-	{
-		/*
-		 * Compute the target bucket number, and convert to block number.
-		 */
-		bucket = _hash_hashkey2bucket(hashkey,
-									  metap->hashm_maxbucket,
-									  metap->hashm_highmask,
-									  metap->hashm_lowmask);
+	bucket = _hash_hashkey2bucket(hashkey,
+								  metap->hashm_maxbucket,
+								  metap->hashm_highmask,
+								  metap->hashm_lowmask);
 
-		blkno = BUCKET_TO_BLKNO(metap, bucket);
+	blkno = BUCKET_TO_BLKNO(metap, bucket);
 
-		/* Release metapage lock, but keep pin. */
+	/* Fetch the primary bucket page for the bucket */
+	buf = ReadBuffer(rel, blkno);
+	if (!ConditionalLockBufferShared(buf))
+	{
+		_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+		LockBuffer(buf, HASH_READ);
+		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
+		_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
+		oldblkno = blkno;
+		retry = true;
+	}
+	else
+	{
+		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
 		_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+	}
 
+	if (retry)
+	{
 		/*
-		 * If the previous iteration of this loop locked what is still the
-		 * correct target bucket, we are done.  Otherwise, drop any old lock
-		 * and lock what now appears to be the correct bucket.
+		 * Loop until we get a lock on the correct target bucket.  We get the
+		 * lock on primary bucket page and retain the pin on it during read
+		 * operation to prevent the concurrent splits.  Retaining pin on a
+		 * primary bucket page ensures that split can't happen as it needs to
+		 * acquire the cleanup lock on primary bucket page.  Acquiring lock on
+		 * primary bucket and rechecking if it is a target bucket is mandatory
+		 * as otherwise a concurrent split followed by vacuum could remove
+		 * tuples from the selected bucket which otherwise would have been
+		 * visible.
 		 */
-		if (retry)
+		for (;;)
 		{
-			if (oldblkno == blkno)
-				break;
-			_hash_droplock(rel, oldblkno, HASH_SHARE);
+			/*
+			 * Compute the target bucket number, and convert to block number.
+			 */
+			bucket = _hash_hashkey2bucket(hashkey,
+										  metap->hashm_maxbucket,
+										  metap->hashm_highmask,
+										  metap->hashm_lowmask);
+
+			blkno = BUCKET_TO_BLKNO(metap, bucket);
+
+			/* Release metapage lock, but keep pin. */
+			_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+			/*
+			 * If the previous iteration of this loop locked what is still the
+			 * correct target bucket, we are done.  Otherwise, drop any old
+			 * lock and lock what now appears to be the correct bucket.
+			 */
+			if (retry)
+			{
+				if (oldblkno == blkno)
+					break;
+				_hash_relbuf(rel, buf);
+			}
+
+			/* Fetch the primary bucket page for the bucket */
+			buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
+
+			/*
+			 * Reacquire metapage lock and check that no bucket split has
+			 * taken place while we were awaiting the bucket lock.
+			 */
+			_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
+			oldblkno = blkno;
+			retry = true;
 		}
-		_hash_getlock(rel, blkno, HASH_SHARE);
-
-		/*
-		 * Reacquire metapage lock and check that no bucket split has taken
-		 * place while we were awaiting the bucket lock.
-		 */
-		_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
-		oldblkno = blkno;
-		retry = true;
 	}
 
 	/* done with the metapage */
@@ -237,14 +304,60 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 	/* Update scan opaque state to show we have lock on the bucket */
 	so->hashso_bucket = bucket;
 	so->hashso_bucket_valid = true;
-	so->hashso_bucket_blkno = blkno;
 
-	/* Fetch the primary bucket page for the bucket */
-	buf = _hash_getbuf(rel, blkno, HASH_READ, LH_BUCKET_PAGE);
+
 	page = BufferGetPage(buf);
 	opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 	Assert(opaque->hasho_bucket == bucket);
 
+	so->hashso_bucket_buf = buf;
+
+	/*
+	 * If the bucket split is in progress, then we need to skip tuples that
+	 * are moved from old bucket.  To ensure that vacuum doesn't clean any
+	 * tuples from old or new buckets till this scan is in progress, maintain
+	 * a pin on both of the buckets.  Here, we have to be cautious about lock
+	 * ordering, first acquire the lock on old bucket, release the lock on old
+	 * bucket, but not pin, then acuire the lock on new bucket and again
+	 * re-verify whether the bucket split still is in progress. Acquiring lock
+	 * on old bucket first ensures that the vacuum waits for this scan to
+	 * finish.
+	 */
+	if (opaque->hasho_flag & LH_BUCKET_NEW_PAGE_SPLIT)
+	{
+		BlockNumber old_blkno;
+		Buffer		old_buf;
+
+		old_blkno = _hash_get_oldblk(rel, opaque);
+
+		/*
+		 * release the lock on new bucket and re-acquire it after acquiring
+		 * the lock on old bucket.
+		 */
+		_hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
+
+		old_buf = _hash_getbuf(rel, old_blkno, HASH_READ, LH_BUCKET_PAGE);
+
+		/*
+		 * remember the old bucket buffer so as to use it later for scanning.
+		 */
+		so->hashso_old_bucket_buf = old_buf;
+		_hash_chgbufaccess(rel, old_buf, HASH_READ, HASH_NOLOCK);
+
+		_hash_chgbufaccess(rel, buf, HASH_NOLOCK, HASH_READ);
+		page = BufferGetPage(buf);
+		opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+		Assert(opaque->hasho_bucket == bucket);
+
+		if (opaque->hasho_flag & LH_BUCKET_NEW_PAGE_SPLIT)
+			so->hashso_skip_moved_tuples = true;
+		else
+		{
+			_hash_dropbuf(rel, so->hashso_old_bucket_buf);
+			so->hashso_old_bucket_buf = InvalidBuffer;
+		}
+	}
+
 	/* If a backwards scan is requested, move to the end of the chain */
 	if (ScanDirectionIsBackward(dir))
 	{
@@ -273,6 +386,13 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
  *		false.  Else, return true and set the hashso_curpos for the
  *		scan to the right thing.
  *
+ *		Here we also scan the old bucket if the split for current bucket
+ *		was in progress at the start of scan.  The basic idea is that
+ *		skip the tuples that are moved by split while scanning current
+ *		bucket and then scan the old bucket to cover all such tuples. This
+ *		is done to ensure that we don't miss any tuples in the current scan
+ *		when split was in progress.
+ *
  *		'bufP' points to the current buffer, which is pinned and read-locked.
  *		On success exit, we have pin and read-lock on whichever page
  *		contains the right item; on failure, we have released all buffers.
@@ -338,6 +458,19 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					{
 						Assert(offnum >= FirstOffsetNumber);
 						itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
+
+						/*
+						 * skip the tuples that are moved by split operation
+						 * for the scan that has started when split was in
+						 * progress
+						 */
+						if (so->hashso_skip_moved_tuples &&
+							(itup->t_info & INDEX_MOVED_BY_SPLIT_MASK))
+						{
+							offnum = OffsetNumberNext(offnum);	/* move forward */
+							continue;
+						}
+
 						if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup))
 							break;		/* yes, so exit for-loop */
 					}
@@ -353,9 +486,41 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					}
 					else
 					{
-						/* end of bucket */
-						itup = NULL;
-						break;	/* exit for-loop */
+						/*
+						 * end of bucket, scan old bucket if there was a split
+						 * in progress at the start of scan.
+						 */
+						if (so->hashso_skip_moved_tuples)
+						{
+							buf = so->hashso_old_bucket_buf;
+
+							/*
+							 * old buket buffer must be valid as we acquire
+							 * the pin on it before the start of scan and
+							 * retain it till end of scan.
+							 */
+							Assert(BufferIsValid(buf));
+
+							_hash_chgbufaccess(rel, buf, HASH_NOLOCK, HASH_READ);
+
+							page = BufferGetPage(buf);
+							maxoff = PageGetMaxOffsetNumber(page);
+							offnum = _hash_binsearch(page, so->hashso_sk_hash);
+
+							/*
+							 * setting hashso_skip_moved_tuples to false
+							 * ensures that we don't check for tuples that are
+							 * moved by split in old bucket and it also
+							 * ensures that we won't retry to scan the old
+							 * bucket once the scan for same is finished.
+							 */
+							so->hashso_skip_moved_tuples = false;
+						}
+						else
+						{
+							itup = NULL;
+							break;		/* exit for-loop */
+						}
 					}
 				}
 				break;
@@ -379,6 +544,19 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					{
 						Assert(offnum <= maxoff);
 						itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
+
+						/*
+						 * skip the tuples that are moved by split operation
+						 * for the scan that has started when split was in
+						 * progress
+						 */
+						if (so->hashso_skip_moved_tuples &&
+							(itup->t_info & INDEX_MOVED_BY_SPLIT_MASK))
+						{
+							offnum = OffsetNumberPrev(offnum);	/* move back */
+							continue;
+						}
+
 						if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup))
 							break;		/* yes, so exit for-loop */
 					}
@@ -394,9 +572,41 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					}
 					else
 					{
-						/* end of bucket */
-						itup = NULL;
-						break;	/* exit for-loop */
+						/*
+						 * end of bucket, scan old bucket if there was a split
+						 * in progress at the start of scan.
+						 */
+						if (so->hashso_skip_moved_tuples)
+						{
+							buf = so->hashso_old_bucket_buf;
+
+							/*
+							 * old buket buffer must be valid as we acquire
+							 * the pin on it before the start of scan and
+							 * retain it till end of scan.
+							 */
+							Assert(BufferIsValid(buf));
+
+							_hash_chgbufaccess(rel, buf, HASH_NOLOCK, HASH_READ);
+
+							page = BufferGetPage(buf);
+							maxoff = PageGetMaxOffsetNumber(page);
+							offnum = _hash_binsearch(page, so->hashso_sk_hash);
+
+							/*
+							 * setting hashso_skip_moved_tuples to false
+							 * ensures that we don't check for tuples that are
+							 * moved by split in old bucket and it also
+							 * ensures that we won't retry to scan the old
+							 * bucket once the scan for same is finished.
+							 */
+							so->hashso_skip_moved_tuples = false;
+						}
+						else
+						{
+							itup = NULL;
+							break;		/* exit for-loop */
+						}
 					}
 				}
 				break;
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index 456954b..bdbeb84 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -147,6 +147,23 @@ _hash_log2(uint32 num)
 }
 
 /*
+ * _hash_msb-- returns most significant bit position.
+ */
+static uint32
+_hash_msb(uint32 num)
+{
+	uint32		i = 0;
+
+	while (num)
+	{
+		num = num >> 1;
+		++i;
+	}
+
+	return i - 1;
+}
+
+/*
  * _hash_checkpage -- sanity checks on the format of all hash pages
  *
  * If flags is not zero, it is a bitwise OR of the acceptable values of
@@ -342,3 +359,123 @@ _hash_binsearch_last(Page page, uint32 hash_value)
 
 	return lower;
 }
+
+/*
+ *	_hash_get_oldblk() -- get the block number from which current bucket
+ *			is being splitted.
+ */
+BlockNumber
+_hash_get_oldblk(Relation rel, HashPageOpaque opaque)
+{
+	Bucket		curr_bucket;
+	Bucket		old_bucket;
+	uint32		mask;
+	Buffer		metabuf;
+	HashMetaPage metap;
+	BlockNumber blkno;
+
+	/*
+	 * To get the old bucket from the current bucket, we need a mask to modulo
+	 * into lower half of table.  This mask is stored in meta page as
+	 * hashm_lowmask, but here we can't rely on the same, because we need a
+	 * value of lowmask that was prevalent at the time when bucket split was
+	 * started.  Masking the most significant bit of new bucket would give us
+	 * old bucket.
+	 */
+	curr_bucket = opaque->hasho_bucket;
+	mask = (((uint32) 1) << _hash_msb(curr_bucket)) - 1;
+	old_bucket = curr_bucket & mask;
+
+	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
+	metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+	blkno = BUCKET_TO_BLKNO(metap, old_bucket);
+
+	_hash_relbuf(rel, metabuf);
+
+	return blkno;
+}
+
+/*
+ *	_hash_get_newblk() -- get the block number of bucket for the new bucket
+ *			that will be generated after split from current bucket.
+ *
+ * This is used to find the new bucket from old bucket based on current table
+ * half.  It is mainly required to finsh the incomplete splits where we are
+ * sure that not more than one bucket could have split in progress from old
+ * bucket.
+ */
+BlockNumber
+_hash_get_newblk(Relation rel, HashPageOpaque opaque)
+{
+	Bucket		curr_bucket;
+	Bucket		new_bucket;
+	uint32		lowmask;
+	uint32		mask;
+	Buffer		metabuf;
+	HashMetaPage metap;
+	BlockNumber blkno;
+
+	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
+	metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+	curr_bucket = opaque->hasho_bucket;
+
+	/*
+	 * new bucket can be obtained by OR'ing old bucket with most significant
+	 * bit of current table half.  There could be multiple buckets that could
+	 * have splitted from curent bucket.  We need the first such bucket that
+	 * exists based on current table half.
+	 */
+	lowmask = metap->hashm_lowmask;
+
+	for (;;)
+	{
+		mask = lowmask + 1;
+		new_bucket = curr_bucket | mask;
+		if (new_bucket > metap->hashm_maxbucket)
+		{
+			lowmask = lowmask >> 1;
+			continue;
+		}
+		blkno = BUCKET_TO_BLKNO(metap, new_bucket);
+		break;
+	}
+
+	_hash_relbuf(rel, metabuf);
+
+	return blkno;
+}
+
+/*
+ *	_hash_get_newbucket() -- get the new bucket that will be generated after
+ *			split from current bucket.
+ *
+ * This is used to find the new bucket from old bucket.  New bucket can be
+ * obtained by OR'ing old bucket with most significant bit of table half
+ * for lowmask passed in this function.  There could be multiple buckets that
+ * could have splitted from curent bucket.  We need the first such bucket that
+ * exists.  Caller must ensure that no more than one split has happened from
+ * old bucket.
+ */
+Bucket
+_hash_get_newbucket(Relation rel, Bucket curr_bucket,
+					uint32 lowmask, uint32 maxbucket)
+{
+	Bucket		new_bucket;
+	uint32		mask;
+
+	for (;;)
+	{
+		mask = lowmask + 1;
+		new_bucket = curr_bucket | mask;
+		if (new_bucket > maxbucket)
+		{
+			lowmask = lowmask >> 1;
+			continue;
+		}
+		break;
+	}
+
+	return new_bucket;
+}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index b7ca9bf..00129ed 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3567,6 +3567,26 @@ ConditionalLockBuffer(Buffer buffer)
 }
 
 /*
+ * Acquire the content_lock for the buffer, but only if we don't have to wait.
+ *
+ * This assumes the caller wants BUFFER_LOCK_SHARED mode.
+ */
+bool
+ConditionalLockBufferShared(Buffer buffer)
+{
+	BufferDesc *buf;
+
+	Assert(BufferIsValid(buffer));
+	if (BufferIsLocal(buffer))
+		return true;			/* act as though we got it */
+
+	buf = GetBufferDescriptor(buffer - 1);
+
+	return LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
+									LW_SHARED);
+}
+
+/*
  * LockBufferForCleanup - lock a buffer in preparation for deleting items
  *
  * Items may be deleted from a disk page only when the caller (a) holds an
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index fa3f9b6..3a64c9d 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -25,6 +25,7 @@
 #include "lib/stringinfo.h"
 #include "storage/bufmgr.h"
 #include "storage/lockdefs.h"
+#include "utils/hsearch.h"
 #include "utils/relcache.h"
 
 /*
@@ -52,6 +53,9 @@ typedef uint32 Bucket;
 #define LH_BUCKET_PAGE			(1 << 1)
 #define LH_BITMAP_PAGE			(1 << 2)
 #define LH_META_PAGE			(1 << 3)
+#define LH_BUCKET_NEW_PAGE_SPLIT	(1 << 4)
+#define LH_BUCKET_OLD_PAGE_SPLIT	(1 << 5)
+#define LH_BUCKET_PAGE_HAS_GARBAGE	(1 << 6)
 
 typedef struct HashPageOpaqueData
 {
@@ -64,6 +68,12 @@ typedef struct HashPageOpaqueData
 
 typedef HashPageOpaqueData *HashPageOpaque;
 
+#define H_HAS_GARBAGE(opaque)			((opaque)->hasho_flag & LH_BUCKET_PAGE_HAS_GARBAGE)
+#define H_OLD_INCOMPLETE_SPLIT(opaque)	((opaque)->hasho_flag & LH_BUCKET_OLD_PAGE_SPLIT)
+#define H_NEW_INCOMPLETE_SPLIT(opaque)	((opaque)->hasho_flag & LH_BUCKET_NEW_PAGE_SPLIT)
+#define H_INCOMPLETE_SPLIT(opaque)		(((opaque)->hasho_flag & LH_BUCKET_NEW_PAGE_SPLIT) || \
+										 ((opaque)->hasho_flag & LH_BUCKET_OLD_PAGE_SPLIT))
+
 /*
  * The page ID is for the convenience of pg_filedump and similar utilities,
  * which otherwise would have a hard time telling pages of different index
@@ -88,12 +98,6 @@ typedef struct HashScanOpaqueData
 	bool		hashso_bucket_valid;
 
 	/*
-	 * If we have a share lock on the bucket, we record it here.  When
-	 * hashso_bucket_blkno is zero, we have no such lock.
-	 */
-	BlockNumber hashso_bucket_blkno;
-
-	/*
 	 * We also want to remember which buffer we're currently examining in the
 	 * scan. We keep the buffer pinned (but not locked) across hashgettuple
 	 * calls, in order to avoid doing a ReadBuffer() for every tuple in the
@@ -101,11 +105,23 @@ typedef struct HashScanOpaqueData
 	 */
 	Buffer		hashso_curbuf;
 
+	/* remember the buffer associated with primary bucket */
+	Buffer		hashso_bucket_buf;
+
+	/*
+	 * remember the buffer associated with old primary bucket which is
+	 * required during the scan of the bucket for which split is in progress.
+	 */
+	Buffer		hashso_old_bucket_buf;
+
 	/* Current position of the scan, as an index TID */
 	ItemPointerData hashso_curpos;
 
 	/* Current position of the scan, as a heap TID */
 	ItemPointerData hashso_heappos;
+
+	/* Whether scan needs to skip tuples that are moved by split */
+	bool		hashso_skip_moved_tuples;
 } HashScanOpaqueData;
 
 typedef HashScanOpaqueData *HashScanOpaque;
@@ -176,6 +192,8 @@ typedef HashMetaPageData *HashMetaPage;
 				  sizeof(ItemIdData) - \
 				  MAXALIGN(sizeof(HashPageOpaqueData)))
 
+#define INDEX_MOVED_BY_SPLIT_MASK	0x2000
+
 #define HASH_MIN_FILLFACTOR			10
 #define HASH_DEFAULT_FILLFACTOR		75
 
@@ -224,9 +242,6 @@ typedef HashMetaPageData *HashMetaPage;
 #define HASH_WRITE		BUFFER_LOCK_EXCLUSIVE
 #define HASH_NOLOCK		(-1)
 
-#define HASH_SHARE		ShareLock
-#define HASH_EXCLUSIVE	ExclusiveLock
-
 /*
  *	Strategy number. There's only one valid strategy for hashing: equality.
  */
@@ -299,19 +314,17 @@ extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
 			   Size itemsize, IndexTuple itup);
 
 /* hashovfl.c */
-extern Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf);
+extern Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin);
 extern BlockNumber _hash_freeovflpage(Relation rel, Buffer ovflbuf,
-				   BufferAccessStrategy bstrategy);
+				   BlockNumber bucket_blkno, BufferAccessStrategy bstrategy);
 extern void _hash_initbitmap(Relation rel, HashMetaPage metap,
 				 BlockNumber blkno, ForkNumber forkNum);
 extern void _hash_squeezebucket(Relation rel,
 					Bucket bucket, BlockNumber bucket_blkno,
+					Buffer bucket_buf,
 					BufferAccessStrategy bstrategy);
 
 /* hashpage.c */
-extern void _hash_getlock(Relation rel, BlockNumber whichlock, int access);
-extern bool _hash_try_getlock(Relation rel, BlockNumber whichlock, int access);
-extern void _hash_droplock(Relation rel, BlockNumber whichlock, int access);
 extern Buffer _hash_getbuf(Relation rel, BlockNumber blkno,
 			 int access, int flags);
 extern Buffer _hash_getinitbuf(Relation rel, BlockNumber blkno);
@@ -329,6 +342,10 @@ extern uint32 _hash_metapinit(Relation rel, double num_tuples,
 				ForkNumber forkNum);
 extern void _hash_pageinit(Page page, Size size);
 extern void _hash_expandtable(Relation rel, Buffer metabuf);
+extern void _hash_splitbucket_guts(Relation rel, Buffer metabuf,
+					   Bucket obucket, Bucket nbucket, Buffer obuf,
+					   Buffer nbuf, HTAB *htab, uint32 maxbucket,
+					   uint32 highmask, uint32 lowmask);
 
 /* hashscan.c */
 extern void _hash_regscan(IndexScanDesc scan);
@@ -363,10 +380,20 @@ extern IndexTuple _hash_form_tuple(Relation index,
 				 Datum *values, bool *isnull);
 extern OffsetNumber _hash_binsearch(Page page, uint32 hash_value);
 extern OffsetNumber _hash_binsearch_last(Page page, uint32 hash_value);
+extern BlockNumber _hash_get_oldblk(Relation rel, HashPageOpaque opaque);
+extern BlockNumber _hash_get_newblk(Relation rel, HashPageOpaque opaque);
+extern Bucket _hash_get_newbucket(Relation rel, Bucket curr_bucket,
+					uint32 lowmask, uint32 maxbucket);
 
 /* hash.c */
 extern void hash_redo(XLogReaderState *record);
 extern void hash_desc(StringInfo buf, XLogReaderState *record);
 extern const char *hash_identify(uint8 info);
+extern void hashbucketcleanup(Relation rel, Buffer bucket_buf,
+				  BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
+				  uint32 maxbucket, uint32 highmask, uint32 lowmask,
+				  double *tuples_removed, double *num_index_tuples,
+				  bool bucket_has_garbage, bool delay,
+				  IndexBulkDeleteCallback callback, void *callback_state);
 
 #endif   /* HASH_H */
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 3d5dea7..4b318a8 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -226,6 +226,7 @@ extern void MarkBufferDirtyHint(Buffer buffer, bool buffer_std);
 extern void UnlockBuffers(void);
 extern void LockBuffer(Buffer buffer, int mode);
 extern bool ConditionalLockBuffer(Buffer buffer);
+extern bool ConditionalLockBufferShared(Buffer buffer);
 extern void LockBufferForCleanup(Buffer buffer);
 extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
