diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index 8140418..832a80d 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -167,6 +167,32 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
 			break;
 	}
 }
+static Buffer
+RelationAddOneBlock(Relation relation, Buffer otherBuffer, BulkInsertState bistate)
+{
+	Buffer buffer;
+	/*
+	 * XXX This does an lseek - rather expensive - but at the moment it is the
+	 * only way to accurately determine how many blocks are in a relation.  Is
+	 * it worth keeping an accurate file length in shared memory someplace,
+	 * rather than relying on the kernel to do it for us?
+	 */
+	buffer = ReadBufferBI(relation, P_NEW, bistate);
+
+	/*
+	 * We can be certain that locking the otherBuffer first is OK, since
+	 * it must have a lower page number.
+	 */
+	if (otherBuffer != InvalidBuffer)
+		LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
+
+	/*
+	 * Now acquire lock on the new page.
+	 */
+	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+	return buffer;
+}
 
 /*
  * RelationGetBufferForTuple
@@ -236,7 +262,8 @@ RelationGetBufferForTuple(Relation relation, Size len,
 	Size		pageFreeSpace,
 				saveFreeSpace;
 	BlockNumber targetBlock,
-				otherBlock;
+				otherBlock,
+				lastValidBlock;
 	bool		needLock;
 
 	len = MAXALIGN(len);		/* be conservative */
@@ -308,6 +335,7 @@ RelationGetBufferForTuple(Relation relation, Size len,
 		}
 	}
 
+loop:
 	while (targetBlock != InvalidBlockNumber)
 	{
 		/*
@@ -388,6 +416,8 @@ RelationGetBufferForTuple(Relation relation, Size len,
 								 otherBlock, targetBlock, vmbuffer_other,
 								 vmbuffer);
 
+		lastValidBlock = targetBlock;
+
 		/*
 		 * Now we can check to see if there's enough free space here. If so,
 		 * we're done.
@@ -441,36 +471,76 @@ RelationGetBufferForTuple(Relation relation, Size len,
 	needLock = !RELATION_IS_LOCAL(relation);
 
 	if (needLock)
-		LockRelationForExtension(relation, ExclusiveLock);
-
-	/*
-	 * XXX This does an lseek - rather expensive - but at the moment it is the
-	 * only way to accurately determine how many blocks are in a relation.  Is
-	 * it worth keeping an accurate file length in shared memory someplace,
-	 * rather than relying on the kernel to do it for us?
-	 */
-	buffer = ReadBufferBI(relation, P_NEW, bistate);
-
-	/*
-	 * We can be certain that locking the otherBuffer first is OK, since it
-	 * must have a lower page number.
-	 */
-	if (otherBuffer != InvalidBuffer)
-		LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
-
-	/*
-	 * Now acquire lock on the new page.
-	 */
-	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
-	/*
-	 * Release the file-extension lock; it's now OK for someone else to extend
-	 * the relation some more.  Note that we cannot release this lock before
-	 * we have buffer lock on the new page, or we risk a race condition
-	 * against vacuumlazy.c --- see comments therein.
-	 */
-	if (needLock)
-		UnlockRelationForExtension(relation, ExclusiveLock);
+	{
+		if (TryLockRelationForExtension(relation, ExclusiveLock))
+		{
+			buffer = RelationAddOneBlock(relation, otherBuffer, bistate);
+			UnlockRelationForExtension(relation, ExclusiveLock);
+		}
+		else
+		{
+			LockRelationForExtension(relation, ExclusiveLock);
+
+			if (use_fsm)
+			{
+				Page		page;
+				Size		freespace;
+				BlockNumber blockNum;
+				int			extraBlocks = 0;
+				int 		lockWaiters = 0;
+				Buffer		buf;
+
+				/*
+				 * Update FSM as to condition of this page, and ask for another page
+				 * to try.
+				 */
+				targetBlock = RecordAndGetPageWithFreeSpace(relation,
+															lastValidBlock,
+															pageFreeSpace,
+															len + saveFreeSpace);
+
+				/* Other waiter has extended the block for us*/
+				if (targetBlock != InvalidBlockNumber)
+				{
+					UnlockRelationForExtension(relation, ExclusiveLock);
+					goto loop;
+				}
+
+				buffer = RelationAddOneBlock(relation, otherBuffer, bistate);
+
+				lockWaiters = RelationExtensionLockWaiter(relation);
+
+				extraBlocks = lockWaiters * 20;
+
+				while (extraBlocks--)
+				{
+					/*
+					 * XXX This does an lseek - rather expensive - but at the moment it is the
+					 * only way to accurately determine how many blocks are in a relation.  Is
+					 * it worth keeping an accurate file length in shared memory someplace,
+					 * rather than relying on the kernel to do it for us?
+					 */
+					buf = ReadBufferBI(relation, P_NEW, bistate);
+					LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+					page = BufferGetPage(buf);
+					PageInit(page, BufferGetPageSize(buf), 0);
+					freespace = PageGetHeapFreeSpace(page);
+					MarkBufferDirty(buf);
+					blockNum = BufferGetBlockNumber(buf);
+					UnlockReleaseBuffer(buf);
+
+					RecordPageWithFreeSpace(relation, blockNum, freespace);
+				}
+			}
+			else
+				buffer = RelationAddOneBlock(relation, otherBuffer, bistate);
+
+			UnlockRelationForExtension(relation, ExclusiveLock);
+		}
+	}
+	else
+		buffer = RelationAddOneBlock(relation, otherBuffer, bistate);
 
 	/*
 	 * We need to initialize the empty new page.  Double-check that it really
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 9d16afb..a56b203 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -340,6 +340,30 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode)
 	(void) LockAcquire(&tag, lockmode, false, false);
 }
 
+LockAcquireResult
+TryLockRelationForExtension(Relation relation, LOCKMODE lockmode)
+{
+	LOCKTAG		tag;
+
+	SET_LOCKTAG_RELATION_EXTEND(tag,
+								relation->rd_lockInfo.lockRelId.dbId,
+								relation->rd_lockInfo.lockRelId.relId);
+
+	return LockAcquire(&tag, lockmode, false, true);
+}
+
+int
+RelationExtensionLockWaiter(Relation relation)
+{
+	LOCKTAG		tag;
+
+	SET_LOCKTAG_RELATION_EXTEND(tag,
+								relation->rd_lockInfo.lockRelId.dbId,
+								relation->rd_lockInfo.lockRelId.relId);
+
+	return LockWaiterCount(&tag);
+}
+
 /*
  *		UnlockRelationForExtension
  */
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index a458c68..8f49192 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -4380,3 +4380,35 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
 	LockRelease(&tag, ShareLock, false);
 	return true;
 }
+
+int
+LockWaiterCount(const LOCKTAG *locktag)
+{
+	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
+	LOCALLOCKTAG localtag;
+	LOCALLOCK  *locallock;
+	LOCK	   *lock;
+	bool		found;
+	uint32		hashcode;
+	int			waiters = 0;
+
+	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
+		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+
+
+	/*
+	 * Find a LOCALLOCK entry for this lock and lockmode
+	 */
+	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
+	localtag.lock = *locktag;
+	localtag.mode = ExclusiveLock;
+
+	locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
+										  (void *) &localtag,
+										  HASH_FIND, &found);
+
+	if (found)
+		waiters = locallock->lock->nRequested;
+
+	return waiters;
+}
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index e9d41bf..a492bb1 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -101,4 +101,7 @@ extern void UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid
 /* Describe a locktag for error messages */
 extern void DescribeLockTag(StringInfo buf, const LOCKTAG *tag);
 
+extern LockAcquireResult TryLockRelationForExtension(Relation relation, LOCKMODE lockmode);
+extern int RelationExtensionLockWaiter(Relation relation);
+
 #endif   /* LMGR_H */
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 788d50a..3fd74fb 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -572,6 +572,8 @@ extern void RememberSimpleDeadLock(PGPROC *proc1,
 					   PGPROC *proc2);
 extern void InitDeadLockChecking(void);
 
+extern int LockWaiterCount(const LOCKTAG *locktag);
+
 #ifdef LOCK_DEBUG
 extern void DumpLocks(PGPROC *proc);
 extern void DumpAllLocks(void);
