On 10/22/2016 08:30 PM, Tomas Vondra wrote:
On 10/20/2016 04:43 PM, Robert Haas wrote:
>>
...

The sb_alloc allocator I proposed a couple of years ago would work
well for this case, I think.


Maybe, but it does not follow the Memory Context design at all, if I
understand it correctly. I was willing to give it a spin anyway and see
how it compares to the two other allocators, but this is a significant
paradigm shift and certainly much larger step than what I proposed.

I'm not even sure it's possible to implement a MemoryContext based on
the same ideas as sb_alloc(), because one of the important points of
sb_alloc design seems to be throwing away the chunk header. While that
may be possible, it would certainly affect the whole tree (not just the
reorderbuffer bit), and it'd require way more work.

Moreover, the two allocators I proposed significantly benefit from the
"same lifespan" assumption. I don't think sb_alloc can do that.


I've given the sb_alloc patch another try - essentially hacking it into reorderbuffer, ignoring the issues mentioned yesterday. And yes, it's faster than the allocators discussed in this thread. Based on a few very quick tests on my laptop, the difference is usually ~5-10%.

That might seem like a significant improvement, but it's negligible compared to the "master -> slab/gen" improvement, which improves performance by orders of magnitude (at least for the tested cases).

Moreover, the slab/gen allocators proposed here seem like a better fit for reorderbuffer, e.g. because they release memory. I haven't looked at sb_alloc too closely, but I think it behaves more like AllocSet in this regard (i.e. keeping the memory indefinitely).

FWIW I'm not making any conclusions about sb_alloc benefits outside reorderbuffer.c - it might easily be worth pursuing, no doubt about that. The amount of remaining work however seems quite high, though.

Attached is the modified sb_alloc patch that I used - it's mostly v1 with removed uses in nbtree etc. FWIW the patch does not implement sb_destroy_private_allocator (it's only defined in the header), which seems like a bug.

regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/contrib/test_freepage/Makefile b/contrib/test_freepage/Makefile
new file mode 100644
index 0000000..b482fe9
--- /dev/null
+++ b/contrib/test_freepage/Makefile
@@ -0,0 +1,17 @@
+# contrib/test_freepage/Makefile
+
+MODULES = test_freepage
+
+EXTENSION = test_freepage
+DATA = test_freepage--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/test_freepage
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/test_freepage/test_freepage--1.0.sql b/contrib/test_freepage/test_freepage--1.0.sql
new file mode 100644
index 0000000..5d3191e
--- /dev/null
+++ b/contrib/test_freepage/test_freepage--1.0.sql
@@ -0,0 +1,15 @@
+/* contrib/test_freepage/test_freepage--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_freepage" to load this file. \quit
+
+CREATE FUNCTION init(size pg_catalog.int8) RETURNS pg_catalog.void
+	AS 'MODULE_PATHNAME' LANGUAGE C STRICT;
+CREATE FUNCTION get(pages pg_catalog.int8) RETURNS pg_catalog.int8
+	AS 'MODULE_PATHNAME' LANGUAGE C STRICT;
+CREATE FUNCTION inquire_largest() RETURNS pg_catalog.int8
+	AS 'MODULE_PATHNAME' LANGUAGE C STRICT;
+CREATE FUNCTION put(first_page pg_catalog.int8, npages pg_catalog.int8)
+	RETURNS pg_catalog.void AS 'MODULE_PATHNAME' LANGUAGE C STRICT;
+CREATE FUNCTION dump() RETURNS pg_catalog.text
+    AS 'MODULE_PATHNAME' LANGUAGE C STRICT;
diff --git a/contrib/test_freepage/test_freepage.c b/contrib/test_freepage/test_freepage.c
new file mode 100644
index 0000000..074cf56
--- /dev/null
+++ b/contrib/test_freepage/test_freepage.c
@@ -0,0 +1,113 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_freepage.c
+ *		Test harness code for free page manager.
+ *
+ * Copyright (C) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/test_freepage/test_freepage.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+#include "utils/freepage.h"
+
+PG_MODULE_MAGIC;
+PG_FUNCTION_INFO_V1(init);
+PG_FUNCTION_INFO_V1(get);
+PG_FUNCTION_INFO_V1(inquire_largest);
+PG_FUNCTION_INFO_V1(put);
+PG_FUNCTION_INFO_V1(dump);
+
+Datum		init(PG_FUNCTION_ARGS);
+Datum		get(PG_FUNCTION_ARGS);
+Datum		inquire_largest(PG_FUNCTION_ARGS);
+Datum		put(PG_FUNCTION_ARGS);
+Datum		dump(PG_FUNCTION_ARGS);
+
+char *space;
+FreePageManager *fpm;
+
+Datum
+init(PG_FUNCTION_ARGS)
+{
+	int64 size = PG_GETARG_INT64(0);
+	Size	first_usable_page;
+	Size	total_pages;
+
+	if (size <= 0 || size % FPM_PAGE_SIZE != 0)
+		elog(ERROR, "bad size");
+
+	if (space != NULL)
+	{
+		free(space);
+		space = NULL;
+		fpm = NULL;
+	}
+
+	space = malloc(size);
+	if (space == NULL)
+		elog(ERROR, "malloc failed: %m");
+
+	fpm = (FreePageManager *) space;
+	FreePageManagerInitialize(fpm, space, NULL, false);
+
+	first_usable_page = sizeof(FreePageManager) / FPM_PAGE_SIZE +
+		(sizeof(FreePageManager) % FPM_PAGE_SIZE == 0 ? 0 : 1);
+	total_pages = size / FPM_PAGE_SIZE;
+
+	FreePageManagerPut(fpm, first_usable_page,
+					   total_pages - first_usable_page);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+get(PG_FUNCTION_ARGS)
+{
+	int64 npages = PG_GETARG_INT64(0);
+	Size first_page;
+
+	if (fpm == NULL)
+		PG_RETURN_NULL();
+
+	if (!FreePageManagerGet(fpm, npages, &first_page))
+		PG_RETURN_NULL();
+
+	PG_RETURN_INT64(first_page);
+}
+
+Datum
+inquire_largest(PG_FUNCTION_ARGS)
+{
+	if (fpm == NULL)
+		PG_RETURN_NULL();
+
+	PG_RETURN_INT64(FreePageManagerInquireLargest(fpm));
+}
+
+Datum
+put(PG_FUNCTION_ARGS)
+{
+	int64 first_page = PG_GETARG_INT64(0);
+	int64 npages = PG_GETARG_INT64(1);
+
+	FreePageManagerPut(fpm, first_page, npages);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+dump(PG_FUNCTION_ARGS)
+{
+	if (fpm == NULL)
+		PG_RETURN_NULL();
+
+	PG_RETURN_TEXT_P(cstring_to_text(FreePageManagerDump(fpm)));
+}
diff --git a/contrib/test_freepage/test_freepage.control b/contrib/test_freepage/test_freepage.control
new file mode 100644
index 0000000..fca4cd9
--- /dev/null
+++ b/contrib/test_freepage/test_freepage.control
@@ -0,0 +1,4 @@
+comment = 'Test code for shared memory message queues'
+default_version = '1.0'
+module_pathname = '$libdir/test_freepage'
+relocatable = true
diff --git a/contrib/test_sballoc/Makefile b/contrib/test_sballoc/Makefile
new file mode 100644
index 0000000..880bccb
--- /dev/null
+++ b/contrib/test_sballoc/Makefile
@@ -0,0 +1,17 @@
+# contrib/test_sballoc/Makefile
+
+MODULES = test_sballoc
+
+EXTENSION = test_sballoc
+DATA = test_sballoc--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/test_sballoc
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/test_sballoc/test_sballoc--1.0.sql b/contrib/test_sballoc/test_sballoc--1.0.sql
new file mode 100644
index 0000000..1cf8a5a
--- /dev/null
+++ b/contrib/test_sballoc/test_sballoc--1.0.sql
@@ -0,0 +1,20 @@
+/* contrib/test_sballoc/test_sballoc--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_sballoc" to load this file. \quit
+
+CREATE FUNCTION alloc(size pg_catalog.int8, count pg_catalog.int8)
+    RETURNS pg_catalog.void
+	AS 'MODULE_PATHNAME' LANGUAGE C STRICT;
+
+CREATE FUNCTION alloc_with_palloc(size pg_catalog.int8, count pg_catalog.int8)
+    RETURNS pg_catalog.void
+	AS 'MODULE_PATHNAME' LANGUAGE C STRICT;
+
+CREATE FUNCTION alloc_list(size pg_catalog.int8, count pg_catalog.int8)
+    RETURNS pg_catalog.void
+	AS 'MODULE_PATHNAME' LANGUAGE C STRICT;
+
+CREATE FUNCTION alloc_list_with_palloc(size pg_catalog.int8, count pg_catalog.int8)
+    RETURNS pg_catalog.void
+	AS 'MODULE_PATHNAME' LANGUAGE C STRICT;
diff --git a/contrib/test_sballoc/test_sballoc.c b/contrib/test_sballoc/test_sballoc.c
new file mode 100644
index 0000000..38c03da
--- /dev/null
+++ b/contrib/test_sballoc/test_sballoc.c
@@ -0,0 +1,144 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_sballoc.c
+ *		Test harness code for superblock allocator.
+ *
+ * Copyright (C) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/test_sballoc/test_sballoc.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "utils/memutils.h"
+#include "utils/sb_alloc.h"
+#include "utils/sb_region.h"
+
+typedef struct llnode
+{
+	struct llnode *next;
+} llnode;
+
+PG_MODULE_MAGIC;
+PG_FUNCTION_INFO_V1(alloc);
+PG_FUNCTION_INFO_V1(alloc_with_palloc);
+PG_FUNCTION_INFO_V1(alloc_list);
+PG_FUNCTION_INFO_V1(alloc_list_with_palloc);
+
+Datum
+alloc(PG_FUNCTION_ARGS)
+{
+	int64 size = PG_GETARG_INT64(0);
+	int64 count = PG_GETARG_INT64(1);
+	int64 i;
+	int64 *p;
+	sb_allocator *a;
+
+	a = sb_create_private_allocator();
+	for (i = 0; i < count; ++i)
+	{
+		p = sb_alloc(a, size, 0);
+		*p = i;
+	}
+	sb_reset_allocator(a);
+	sb_dump_regions();
+
+	PG_RETURN_VOID();
+}
+
+Datum
+alloc_with_palloc(PG_FUNCTION_ARGS)
+{
+	int64 size = PG_GETARG_INT64(0);
+	int64 count = PG_GETARG_INT64(1);
+	int64 i;
+	int64 *p;
+	MemoryContext context;
+
+	context = AllocSetContextCreate(CurrentMemoryContext,
+   								    "alloc_with_palloc test",
+								    ALLOCSET_DEFAULT_MINSIZE,
+								    ALLOCSET_DEFAULT_INITSIZE,
+								    ALLOCSET_DEFAULT_MAXSIZE);
+	for (i = 0; i < count; ++i)
+	{
+		p = MemoryContextAlloc(context, size);
+		*p = i;
+	}
+	MemoryContextStats(context);
+	MemoryContextDelete(context);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+alloc_list(PG_FUNCTION_ARGS)
+{
+	int64 size = PG_GETARG_INT64(0);
+	int64 count = PG_GETARG_INT64(1);
+	int64 i;
+	llnode *h = NULL;
+	llnode *p;
+	sb_allocator *a;
+
+	if (size < sizeof(llnode))
+		elog(ERROR, "size too small");
+
+	a = sb_create_private_allocator();
+	for (i = 0; i < count; ++i)
+	{
+		p = sb_alloc(a, size, 0);
+		p->next = h;
+		h = p;
+	}
+	while (h != NULL)
+	{
+		p = h->next;
+		sb_free(h);
+		h = p;
+	}
+	sb_dump_regions();
+	sb_reset_allocator(a);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+alloc_list_with_palloc(PG_FUNCTION_ARGS)
+{
+	int64 size = PG_GETARG_INT64(0);
+	int64 count = PG_GETARG_INT64(1);
+	int64 i;
+	llnode *h = NULL;
+	llnode *p;
+	MemoryContext context;
+
+	if (size < sizeof(llnode))
+		elog(ERROR, "size too small");
+
+	context = AllocSetContextCreate(CurrentMemoryContext,
+   								    "alloc_list_with_palloc test",
+								    ALLOCSET_DEFAULT_MINSIZE,
+								    ALLOCSET_DEFAULT_INITSIZE,
+								    ALLOCSET_DEFAULT_MAXSIZE);
+	for (i = 0; i < count; ++i)
+	{
+		p = MemoryContextAlloc(context, size);
+		p->next = h;
+		h = p;
+	}
+	while (h != NULL)
+	{
+		p = h->next;
+		pfree(h);
+		h = p;
+	}
+	MemoryContextStats(context);
+	MemoryContextDelete(context);
+
+	PG_RETURN_VOID();
+}
diff --git a/contrib/test_sballoc/test_sballoc.control b/contrib/test_sballoc/test_sballoc.control
new file mode 100644
index 0000000..58f61c0
--- /dev/null
+++ b/contrib/test_sballoc/test_sballoc.control
@@ -0,0 +1,4 @@
+comment = 'Test code for shared memory message queues'
+default_version = '1.0'
+module_pathname = '$libdir/test_sballoc'
+relocatable = true
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 6ad7e7d..4eb4377 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -71,6 +71,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenodemap.h"
+#include "utils/sb_alloc.h"
 #include "utils/tqual.h"
 
 
@@ -149,17 +150,6 @@ typedef struct ReorderBufferDiskChange
  */
 static const Size max_changes_in_memory = 4096;
 
-/*
- * We use a very simple form of a slab allocator for frequently allocated
- * objects, simply keeping a fixed number in a linked list when unused,
- * instead pfree()ing them. Without that in many workloads aset.c becomes a
- * major bottleneck, especially when spilling to disk while decoding batch
- * workloads.
- */
-static const Size max_cached_changes = 4096 * 2;
-static const Size max_cached_tuplebufs = 4096 * 2;		/* ~8MB */
-static const Size max_cached_transactions = 512;
-
 
 /* ---------------------------------------
  * primary reorderbuffer support routines
@@ -240,6 +230,7 @@ ReorderBufferAllocate(void)
 	memset(&hash_ctl, 0, sizeof(hash_ctl));
 
 	buffer->context = new_ctx;
+	buffer->allocator = sb_create_private_allocator();
 
 	hash_ctl.keysize = sizeof(TransactionId);
 	hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
@@ -251,19 +242,12 @@ ReorderBufferAllocate(void)
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
-	buffer->nr_cached_transactions = 0;
-	buffer->nr_cached_changes = 0;
-	buffer->nr_cached_tuplebufs = 0;
-
 	buffer->outbuf = NULL;
 	buffer->outbufsize = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
 	dlist_init(&buffer->toplevel_by_lsn);
-	dlist_init(&buffer->cached_transactions);
-	dlist_init(&buffer->cached_changes);
-	slist_init(&buffer->cached_tuplebufs);
 
 	return buffer;
 }
@@ -281,6 +265,9 @@ ReorderBufferFree(ReorderBuffer *rb)
 	 * memory context.
 	 */
 	MemoryContextDelete(context);
+
+	/* And we also destroy the private sb allocator instance. */
+	// sb_destroy_private_allocator(rb->allocator);
 }
 
 /*
@@ -291,19 +278,8 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 {
 	ReorderBufferTXN *txn;
 
-	/* check the slab cache */
-	if (rb->nr_cached_transactions > 0)
-	{
-		rb->nr_cached_transactions--;
-		txn = (ReorderBufferTXN *)
-			dlist_container(ReorderBufferTXN, node,
-							dlist_pop_head_node(&rb->cached_transactions));
-	}
-	else
-	{
-		txn = (ReorderBufferTXN *)
-			MemoryContextAlloc(rb->context, sizeof(ReorderBufferTXN));
-	}
+	txn = (ReorderBufferTXN *)sb_alloc(rb->allocator,
+									   sizeof(ReorderBufferTXN), 0);
 
 	memset(txn, 0, sizeof(ReorderBufferTXN));
 
@@ -344,18 +320,7 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		txn->invalidations = NULL;
 	}
 
-	/* check whether to put into the slab cache */
-	if (rb->nr_cached_transactions < max_cached_transactions)
-	{
-		rb->nr_cached_transactions++;
-		dlist_push_head(&rb->cached_transactions, &txn->node);
-		VALGRIND_MAKE_MEM_UNDEFINED(txn, sizeof(ReorderBufferTXN));
-		VALGRIND_MAKE_MEM_DEFINED(&txn->node, sizeof(txn->node));
-	}
-	else
-	{
-		pfree(txn);
-	}
+	sb_free(txn);
 }
 
 /*
@@ -367,18 +332,8 @@ ReorderBufferGetChange(ReorderBuffer *rb)
 	ReorderBufferChange *change;
 
 	/* check the slab cache */
-	if (rb->nr_cached_changes)
-	{
-		rb->nr_cached_changes--;
-		change = (ReorderBufferChange *)
-			dlist_container(ReorderBufferChange, node,
-							dlist_pop_head_node(&rb->cached_changes));
-	}
-	else
-	{
-		change = (ReorderBufferChange *)
-			MemoryContextAlloc(rb->context, sizeof(ReorderBufferChange));
-	}
+	change = (ReorderBufferChange *)sb_alloc(rb->allocator,
+											 sizeof(ReorderBufferChange), 0);
 
 	memset(change, 0, sizeof(ReorderBufferChange));
 	return change;
@@ -434,18 +389,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 			break;
 	}
 
-	/* check whether to put into the slab cache */
-	if (rb->nr_cached_changes < max_cached_changes)
-	{
-		rb->nr_cached_changes++;
-		dlist_push_head(&rb->cached_changes, &change->node);
-		VALGRIND_MAKE_MEM_UNDEFINED(change, sizeof(ReorderBufferChange));
-		VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
-	}
-	else
-	{
-		pfree(change);
-	}
+	sb_free(change);
 }
 
 
@@ -461,42 +405,11 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
 
 	alloc_len = tuple_len + SizeofHeapTupleHeader;
 
-	/*
-	 * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
-	 * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
-	 * generated for oldtuples can be bigger, as they don't have out-of-line
-	 * toast columns.
-	 */
-	if (alloc_len < MaxHeapTupleSize)
-		alloc_len = MaxHeapTupleSize;
-
-
-	/* if small enough, check the slab cache */
-	if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
-	{
-		rb->nr_cached_tuplebufs--;
-		tuple = slist_container(ReorderBufferTupleBuf, node,
-								slist_pop_head_node(&rb->cached_tuplebufs));
-		Assert(tuple->alloc_tuple_size == MaxHeapTupleSize);
-#ifdef USE_ASSERT_CHECKING
-		memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
-		VALGRIND_MAKE_MEM_UNDEFINED(&tuple->tuple, sizeof(HeapTupleData));
-#endif
-		tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
-#ifdef USE_ASSERT_CHECKING
-		memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
-		VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
-#endif
-	}
-	else
-	{
-		tuple = (ReorderBufferTupleBuf *)
-			MemoryContextAlloc(rb->context,
-							   sizeof(ReorderBufferTupleBuf) +
-							   MAXIMUM_ALIGNOF + alloc_len);
-		tuple->alloc_tuple_size = alloc_len;
-		tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
-	}
+	tuple = (ReorderBufferTupleBuf *)sb_alloc(rb->allocator,
+											  sizeof(ReorderBufferTupleBuf) +
+											  MAXIMUM_ALIGNOF + alloc_len, 0);
+	tuple->alloc_tuple_size = alloc_len;
+	tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
 
 	return tuple;
 }
@@ -511,20 +424,7 @@ void
 ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
 {
 	/* check whether to put into the slab cache, oversized tuples never are */
-	if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
-		rb->nr_cached_tuplebufs < max_cached_tuplebufs)
-	{
-		rb->nr_cached_tuplebufs++;
-		slist_push_head(&rb->cached_tuplebufs, &tuple->node);
-		VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
-		VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
-		VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
-		VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
-	}
-	else
-	{
-		pfree(tuple);
-	}
+	sb_free(tuple);
 }
 
 /*
diff --git a/src/backend/utils/mmgr/Makefile b/src/backend/utils/mmgr/Makefile
index b2403e1..c318a73 100644
--- a/src/backend/utils/mmgr/Makefile
+++ b/src/backend/utils/mmgr/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/utils/mmgr
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = aset.o mcxt.o portalmem.o
+OBJS = aset.o freepage.o mcxt.o portalmem.o sb_alloc.o sb_map.o sb_region.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/utils/mmgr/freepage.c b/src/backend/utils/mmgr/freepage.c
new file mode 100644
index 0000000..0fdd758
--- /dev/null
+++ b/src/backend/utils/mmgr/freepage.c
@@ -0,0 +1,1778 @@
+/*-------------------------------------------------------------------------
+ *
+ * freepage.c
+ *	  Management of free memory pages.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/utils/mmgr/freepage.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "lib/stringinfo.h"
+#include "miscadmin.h"
+#include "utils/sb_region.h"
+
+/* Magic numbers to identify various page types */
+#define FREE_PAGE_SPAN_LEADER_MAGIC		0xea4020f0
+#define FREE_PAGE_LEAF_MAGIC            0x98eae728
+#define FREE_PAGE_INTERNAL_MAGIC        0x19aa32c9
+
+/* Doubly linked list of spans of free pages; stored in first page of span. */
+struct FreePageSpanLeader
+{
+	int		magic;				/* always FREE_PAGE_SPAN_LEADER_MAGIC */
+	Size	npages;				/* number of pages in span */
+	relptr(FreePageSpanLeader)	prev;
+	relptr(FreePageSpanLeader)	next;
+};
+
+/* Common header for btree leaf and internal pages. */
+typedef struct FreePageBtreeHeader
+{
+	int		magic;		/* FREE_PAGE_LEAF_MAGIC or FREE_PAGE_INTERNAL_MAGIC */
+	Size	nused;		/* number of items used */
+	relptr(FreePageBtree) parent;	/* uplink */
+} FreePageBtreeHeader;
+
+/* Internal key; points to next level of btree. */
+typedef struct FreePageBtreeInternalKey
+{
+	Size	first_page;				/* low bound for keys on child page */
+	relptr(FreePageBtree) child;	/* downlink */
+} FreePageBtreeInternalKey;
+
+/* Leaf key; no payload data. */
+typedef struct FreePageBtreeLeafKey
+{
+	Size	first_page;				/* first page in span */
+	Size	npages;					/* number of pages in span */
+} FreePageBtreeLeafKey;
+
+/* Work out how many keys will fit on a page. */
+#define FPM_ITEMS_PER_INTERNAL_PAGE \
+	((FPM_PAGE_SIZE - sizeof(FreePageBtreeHeader)) / \
+		sizeof(FreePageBtreeInternalKey))
+#define FPM_ITEMS_PER_LEAF_PAGE \
+	((FPM_PAGE_SIZE - sizeof(FreePageBtreeHeader)) / \
+		sizeof(FreePageBtreeLeafKey))
+
+/* A btree page of either sort */
+struct FreePageBtree
+{
+	FreePageBtreeHeader	hdr;
+	union
+	{
+		FreePageBtreeInternalKey internal_key[FPM_ITEMS_PER_INTERNAL_PAGE];
+		FreePageBtreeLeafKey leaf_key[FPM_ITEMS_PER_LEAF_PAGE];
+	} u;
+};
+
+/* Results of a btree search */
+typedef struct FreePageBtreeSearchResult
+{
+	FreePageBtree  *page;
+	Size			index;
+	bool			found;
+	unsigned		split_pages;
+} FreePageBtreeSearchResult;
+
+/* Helper functions */
+static void FreePageBtreeAdjustAncestorKeys(FreePageManager *fpm,
+					FreePageBtree *btp);
+static Size FreePageBtreeCleanup(FreePageManager *fpm);
+static FreePageBtree *FreePageBtreeFindLeftSibling(char *base,
+							 FreePageBtree *btp);
+static FreePageBtree *FreePageBtreeFindRightSibling(char *base,
+							  FreePageBtree *btp);
+static Size FreePageBtreeFirstKey(FreePageBtree *btp);
+static FreePageBtree *FreePageBtreeGetRecycled(FreePageManager *fpm);
+static void FreePageBtreeInsertInternal(char *base, FreePageBtree *btp,
+							Size index, Size first_page, FreePageBtree *child);
+static void FreePageBtreeInsertLeaf(FreePageBtree *btp, Size index,
+						Size first_page, Size npages);
+static void FreePageBtreeRecycle(FreePageManager *fpm, Size pageno);
+static void FreePageBtreeRemove(FreePageManager *fpm, FreePageBtree *btp,
+					Size index);
+static void FreePageBtreeRemovePage(FreePageManager *fpm, FreePageBtree *btp);
+static void FreePageBtreeSearch(FreePageManager *fpm, Size first_page,
+					FreePageBtreeSearchResult *result);
+static Size FreePageBtreeSearchInternal(FreePageBtree *btp, Size first_page);
+static Size FreePageBtreeSearchLeaf(FreePageBtree *btp, Size first_page);
+static FreePageBtree *FreePageBtreeSplitPage(FreePageManager *fpm,
+					   FreePageBtree *btp);
+static void FreePageBtreeUpdateParentPointers(char *base, FreePageBtree *btp);
+static void FreePageManagerDumpBtree(FreePageManager *fpm, FreePageBtree *btp,
+						 FreePageBtree *parent, int level, StringInfo buf);
+static void FreePageManagerDumpSpans(FreePageManager *fpm,
+						 FreePageSpanLeader *span, Size expected_pages,
+						 StringInfo buf);
+static bool FreePageManagerGetInternal(FreePageManager *fpm, Size npages,
+						   Size *first_page);
+static Size FreePageManagerPutInternal(FreePageManager *fpm, Size first_page,
+						   Size npages, bool soft);
+static void FreePagePopSpanLeader(FreePageManager *fpm, Size pageno);
+static void FreePagePushSpanLeader(FreePageManager *fpm, Size first_page,
+					   Size npages);
+
+/*
+ * Initialize a new, empty free page manager.
+ *
+ * 'fpm' should reference caller-provided memory large enough to contain a
+ * FreePageManager.  We'll initialize it here.
+ *
+ * 'base' is the address to which all pointers are relative.  When managing
+ * a dynamic shared memory segment, it should normally be the base of the
+ * segment.  When managing backend-private memory, it can be either NULL or,
+ * if managing a single contiguous extent of memory, the start of that extent.
+ *
+ * 'lock' is the lock to be used to synchronize access to this FreePageManager.
+ * It can be NULL if synchronization is not required, either because we're
+ * managing backend-private memory or because we're managing shared memory but
+ * synchronization is caller-provided or not required.  (For example, if only
+ * one process is allocating and freeing memory, locking isn't needed.)
+ *
+ * 'lock_address_is_fixed' should be false if the LWLock to be used for
+ * synchronization is stored in the same dynamic shared memory segment as
+ * the managed region, and true if it is stored in the main shared memory
+ * segment.  Storing the LWLock in some other dynamic shared memory segment
+ * isn't supported.  This is ignored when lock is NULL.
+ */
+void
+FreePageManagerInitialize(FreePageManager *fpm, char *base, LWLock *lock,
+						  bool lock_address_is_fixed)
+{
+	Size	f;
+
+	relptr_store(base, fpm->self, fpm);
+	relptr_store(base, fpm->lock, lock);
+	fpm->lock_address_is_fixed = lock_address_is_fixed;
+	relptr_store(base, fpm->btree_root, (FreePageBtree *) NULL);
+	relptr_store(base, fpm->btree_recycle, (FreePageSpanLeader *) NULL);
+	fpm->btree_depth = 0;
+	fpm->btree_recycle_count = 0;
+	fpm->singleton_first_page = 0;
+	fpm->singleton_npages = 0;
+	fpm->largest_reported_chunk = 0;
+
+	for (f = 0; f < FPM_NUM_FREELISTS; f++)
+		relptr_store(base, fpm->freelist[f], (FreePageSpanLeader *) NULL);
+}
+
+/*
+ * Allocate a run of pages of the given length from the free page manager.
+ * The return value indicates whether we were able to satisfy the request;
+ * if true, the first page of the allocation is stored in *first_page.
+ */
+bool
+FreePageManagerGet(FreePageManager *fpm, Size npages, Size *first_page)
+{
+	LWLock *lock = fpm_lock(fpm);
+	bool	result;
+	Size	contiguous_pages;
+
+	if (lock != NULL)
+		LWLockAcquire(lock, LW_EXCLUSIVE);
+	result = FreePageManagerGetInternal(fpm, npages, first_page);
+
+	/*
+	 * It's a bit counterintuitive, but allocating pages can actually create
+	 * opportunities for cleanup that create larger ranges.  We might pull
+	 * a key out of the btree that enables the item at the head of the btree
+	 * recycle list to be inserted; and then if there are more items behind it
+	 * one of those might cause two currently-separated ranges to merge,
+	 * creating a single range of contiguous pages larger than any that existed
+	 * previously.  It might be worth trying to improve the cleanup algorithm
+	 * to avoid such corner cases, but for now we just notice the condition
+	 * and do the appropriate reporting.
+	 *
+	 * Reporting is only needed for backend-private regions, so we can skip
+	 * it when locking is in use, or if we discover that the region has an
+	 * associated dynamic shared memory segment.
+	 */
+	contiguous_pages = FreePageBtreeCleanup(fpm);
+	if (lock == NULL && contiguous_pages > fpm->largest_reported_chunk)
+	{
+		sb_region *region = sb_lookup_region(fpm);
+
+		if (region != NULL && region->seg == NULL)
+		{
+			sb_report_contiguous_freespace(region, contiguous_pages);
+			fpm->largest_reported_chunk = contiguous_pages;
+		}
+		else
+		{
+			/* There's no containing region, so try to avoid future work. */
+			fpm->largest_reported_chunk = (Size) -1;
+		}
+	}
+
+	if (lock != NULL)
+		LWLockRelease(lock);
+
+	return result;
+}
+
+/*
+ * Return the size of the largest run of pages that the user could
+ * succesfully get.  (If this value subsequently increases, it will trigger
+ * a callback to sb_report_contiguous_freespace.)
+ */
+Size
+FreePageManagerInquireLargest(FreePageManager *fpm)
+{
+	LWLock *lock = fpm_lock(fpm);
+	char   *base = fpm_segment_base(fpm);
+	Size	largest = 0;
+
+	if (lock != NULL)
+		LWLockAcquire(lock, LW_EXCLUSIVE);
+
+	if (!relptr_is_null(fpm->freelist[FPM_NUM_FREELISTS - 1]))
+	{
+		FreePageSpanLeader *candidate;
+
+		candidate = relptr_access(base, fpm->freelist[FPM_NUM_FREELISTS - 1]);
+		do
+		{
+			if (candidate->npages > largest)
+				largest = candidate->npages;
+			candidate = relptr_access(base, candidate->next);
+		} while (candidate != NULL);
+	}
+	else
+	{
+		Size	f = FPM_NUM_FREELISTS - 1;
+
+		do
+		{
+			--f;
+			if (!relptr_is_null(fpm->freelist[f]))
+				largest = f + 1;
+		} while (f > 0);
+	}
+
+	fpm->largest_reported_chunk = largest;
+
+	if (lock != NULL)
+		LWLockRelease(lock);
+
+	return largest;
+}
+
+/*
+ * Transfer a run of pages to the free page manager.  (If the number of
+ * contiguous pages now available is larger than it was previously, then
+ * we attempt to report this to the sb_region module.)
+ */
+void
+FreePageManagerPut(FreePageManager *fpm, Size first_page, Size npages)
+{
+	LWLock *lock = fpm_lock(fpm);
+	Size	contiguous_pages;
+	Assert(npages > 0);
+
+	/* Acquire lock (if there is one). */
+	if (lock != NULL)
+		LWLockAcquire(lock, LW_EXCLUSIVE);
+
+	/* Record the new pages. */
+	contiguous_pages =
+		FreePageManagerPutInternal(fpm, first_page, npages, false);
+
+	/*
+	 * If the new range we inserted into the page manager was contiguous
+	 * with an existing range, it may have opened up cleanup opportunities.
+	 */
+	if (contiguous_pages > npages)
+	{
+		Size	cleanup_contiguous_pages;
+
+		cleanup_contiguous_pages = FreePageBtreeCleanup(fpm);
+		if (cleanup_contiguous_pages > contiguous_pages)
+			contiguous_pages = cleanup_contiguous_pages;
+	}
+
+	/*
+	 * If we now have more contiguous pages available than previously
+	 * reported, attempt to notify sb_region system.
+	 *
+	 * Reporting is only needed for backend-private regions, so we can skip
+	 * it when locking is in use, or if we discover that the region has an
+	 * associated dynamic shared memory segment.
+	 */
+	if (lock == NULL && contiguous_pages > fpm->largest_reported_chunk)
+	{
+		sb_region *region = sb_lookup_region(fpm);
+
+		if (region != NULL && region->seg == NULL)
+		{
+			fpm->largest_reported_chunk = contiguous_pages;
+			sb_report_contiguous_freespace(region, contiguous_pages);
+		}
+		else
+		{
+			/* There's no containing region, so try to avoid future work. */
+			fpm->largest_reported_chunk = (Size) -1;
+		}
+	}
+
+	/* Release lock (if there is one). */
+	if (lock != NULL)
+		LWLockRelease(lock);
+}
+
+/*
+ * Produce a debugging dump of the state of a free page manager.
+ */
+char *
+FreePageManagerDump(FreePageManager *fpm)
+{
+	LWLock *lock = fpm_lock(fpm);
+	char *base = fpm_segment_base(fpm);
+	StringInfoData	buf;
+	FreePageSpanLeader *recycle;
+	bool	dumped_any_freelist = false;
+	Size	f;
+
+	/* Initialize output buffer. */
+	initStringInfo(&buf);
+
+	/* Acquire lock (if there is one). */
+	if (lock != NULL)
+		LWLockAcquire(lock, LW_SHARED);
+
+	/* Dump general stuff. */
+	appendStringInfo(&buf, "metadata: self %zu lock %zu fixed %c\n",
+					 fpm->self.relptr_off, fpm->lock.relptr_off,
+					 fpm->lock_address_is_fixed ? 't' : 'f');
+
+	/* Dump btree. */
+	if (fpm->btree_depth > 0)
+	{
+		FreePageBtree *root;
+
+		appendStringInfo(&buf, "btree depth %u:\n", fpm->btree_depth);
+		root = relptr_access(base, fpm->btree_root);
+		FreePageManagerDumpBtree(fpm, root, NULL, 0, &buf);
+	}
+	else if (fpm->singleton_npages > 0)
+	{
+		appendStringInfo(&buf, "singleton: %zu(%zu)\n",
+						 fpm->singleton_first_page, fpm->singleton_npages);
+	}
+
+	/* Dump btree recycle list. */
+	recycle = relptr_access(base, fpm->btree_recycle);
+	if (recycle != NULL)
+	{
+		appendStringInfo(&buf, "btree recycle:");
+		FreePageManagerDumpSpans(fpm, recycle, 1, &buf);
+	}
+
+	/* Dump free lists. */
+	for (f = 0; f < FPM_NUM_FREELISTS; ++f)
+	{
+		FreePageSpanLeader *span;
+
+		if (relptr_is_null(fpm->freelist[f]))
+			continue;
+		if (!dumped_any_freelist)
+		{
+			appendStringInfo(&buf, "freelists:\n");
+			dumped_any_freelist = true;
+		}
+		appendStringInfo(&buf, "  %zu:", f + 1);
+		span = relptr_access(base, fpm->freelist[f]);
+		FreePageManagerDumpSpans(fpm, span, f + 1, &buf);
+	}
+
+	/* Release lock (if there is one). */
+	if (lock != NULL)
+		LWLockRelease(lock);
+
+	/* And return result to caller. */
+	return buf.data;
+}
+
+
+/*
+ * The first_page value stored at index zero in any non-root page must match
+ * the first_page value stored in its parent at the index which points to that
+ * page.  So when the value stored at index zero in a btree page changes, we've
+ * got to walk up the tree adjusting ancestor keys until we reach an ancestor
+ * where that key isn't index zero.  This function should be called after
+ * updating the first key on the target page; it will propagate the change
+ * upward as far as needed.
+ *
+ * We assume here that the first key on the page has not changed enough to
+ * require changes in the ordering of keys on its ancestor pages.  Thus,
+ * if we search the parent page for the first key greater than or equal to
+ * the first key on the current page, the downlink to this page will be either
+ * the exact index returned by the search (if the first key decreased)
+ * or one less (if the first key increased).
+ */
+static void
+FreePageBtreeAdjustAncestorKeys(FreePageManager *fpm, FreePageBtree *btp)
+{
+	char *base = fpm_segment_base(fpm);
+	Size	first_page;
+	FreePageBtree *parent;
+	FreePageBtree *child;
+
+	/* This might be either a leaf or an internal page. */
+	Assert(btp->hdr.nused > 0);
+	if (btp->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+	{
+		Assert(btp->hdr.nused <= FPM_ITEMS_PER_LEAF_PAGE);
+		first_page = btp->u.leaf_key[0].first_page;
+	}
+	else
+	{
+		Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+		Assert(btp->hdr.nused <= FPM_ITEMS_PER_INTERNAL_PAGE);
+		first_page = btp->u.internal_key[0].first_page;
+	}
+	child = btp;
+
+	/* Loop until we find an ancestor that does not require adjustment. */
+	for (;;)
+	{
+		Size	s;
+
+		parent = relptr_access(base, child->hdr.parent);
+		if (parent == NULL)
+			break;
+		s = FreePageBtreeSearchInternal(parent, first_page);
+
+		/* Key is either at index s or index s-1; figure out which. */
+		if (s >= parent->hdr.nused)
+		{
+			Assert(s == parent->hdr.nused);
+			--s;
+		}
+		else
+		{
+			FreePageBtree *check;
+
+			check = relptr_access(base, parent->u.internal_key[s].child);
+			if (check != child)
+			{
+				Assert(s > 0);
+				--s;
+			}
+		}
+
+#ifdef USE_ASSERT_CHECKING
+		/* Debugging double-check. */
+		if (assert_enabled)
+		{
+			FreePageBtree *check;
+
+			check = relptr_access(base, parent->u.internal_key[s].child);
+			Assert(s < parent->hdr.nused);
+			Assert(child == check);
+		}
+#endif
+
+		/* Update the parent key. */
+		parent->u.internal_key[s].first_page = first_page;
+
+		/*
+		 * If this is the first key in the parent, go up another level;
+		 * else done.
+		 */
+		if (s > 0)
+			break;
+		child = parent;
+	}
+}
+
+/*
+ * Attempt to reclaim space from the free-page btree.  The return value is
+ * the largest range of contiguous pages created by the cleanup operation.
+ */
+static Size
+FreePageBtreeCleanup(FreePageManager *fpm)
+{
+	char *base = fpm_segment_base(fpm);
+	Size	max_contiguous_pages = 0;
+
+	/* Attempt to shrink the depth of the btree. */
+	while (!relptr_is_null(fpm->btree_root))
+	{
+		FreePageBtree *root = relptr_access(base, fpm->btree_root);
+
+		/* If the root contains only one key, reduce depth by one. */
+		if (root->hdr.nused == 1)
+		{
+			/* Shrink depth of tree by one. */
+			Assert(fpm->btree_depth > 0);
+			--fpm->btree_depth;
+			if (root->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+			{
+				/* If root is a leaf, convert only entry to singleton range. */
+				relptr_store(base, fpm->btree_root, (FreePageBtree *) NULL);
+				fpm->singleton_first_page = root->u.leaf_key[0].first_page;
+				fpm->singleton_npages = root->u.leaf_key[0].npages;
+			}
+			else
+			{
+				FreePageBtree *newroot;
+
+				/* If root is an internal page, make only child the root. */
+				Assert(root->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+				relptr_copy(fpm->btree_root, root->u.internal_key[0].child);
+				newroot = relptr_access(base, fpm->btree_root);
+				relptr_store(base, newroot->hdr.parent, (FreePageBtree *) NULL);
+			}
+			FreePageBtreeRecycle(fpm, fpm_pointer_to_page(base, root));
+		}
+		else if (root->hdr.nused == 2 &&
+				 root->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+		{
+			Size	end_of_first;
+			Size	start_of_second;
+
+			end_of_first = root->u.leaf_key[0].first_page +
+				root->u.leaf_key[0].npages;
+			start_of_second = root->u.leaf_key[1].first_page;
+
+			if (end_of_first + 1 == start_of_second)
+			{
+				Size	root_page = fpm_pointer_to_page(base, root);
+
+				if (end_of_first == root_page)
+				{
+					FreePagePopSpanLeader(fpm, root->u.leaf_key[0].first_page);
+					FreePagePopSpanLeader(fpm, root->u.leaf_key[1].first_page);
+					fpm->singleton_first_page = root->u.leaf_key[0].first_page;
+					fpm->singleton_npages = root->u.leaf_key[0].npages +
+						root->u.leaf_key[1].npages + 1;
+					fpm->btree_depth = 0;
+					relptr_store(base, fpm->btree_root,
+								 (FreePageBtree *) NULL);
+					FreePagePushSpanLeader(fpm, fpm->singleton_first_page,
+										   fpm->singleton_npages);
+					Assert(max_contiguous_pages == 0);
+					max_contiguous_pages = fpm->singleton_npages;
+				}
+			}
+
+			/* Whether it worked or not, it's time to stop. */
+			break;
+		}
+		else
+		{
+			/* Nothing more to do.  Stop. */
+			break;
+		}
+	}
+
+	/*
+	 * Attempt to free recycled btree pages.  We skip this if releasing
+	 * the recycled page would require a btree page split, because the page
+	 * we're trying to recycle would be consumed by the split, which would
+	 * be counterproductive.
+	 *
+	 * We also currently only ever attempt to recycle the first page on the
+	 * list; that could be made more aggressive, but it's not clear that the
+	 * complexity would be worthwhile.
+	 */
+	while (fpm->btree_recycle_count > 0)
+	{
+		FreePageBtree *btp;
+		Size	first_page;
+		Size	contiguous_pages;
+
+		btp = FreePageBtreeGetRecycled(fpm);
+		first_page = fpm_pointer_to_page(base, btp);
+		contiguous_pages = FreePageManagerPutInternal(fpm, first_page, 1, true);
+		if (contiguous_pages == 0)
+		{
+			FreePageBtreeRecycle(fpm, first_page);
+			break;
+		}
+		else
+		{
+			if (contiguous_pages > max_contiguous_pages)
+				max_contiguous_pages = contiguous_pages;
+		}
+	}
+
+	return max_contiguous_pages;
+}
+
+/*
+ * Consider consolidating the given page with its left or right sibling,
+ * if it's fairly empty.
+ */
+static void
+FreePageBtreeConsolidate(FreePageManager *fpm, FreePageBtree *btp)
+{
+	char *base = fpm_segment_base(fpm);
+	FreePageBtree *np;
+	Size	max;
+
+	/*
+	 * We only try to consolidate pages that are less than a third full.
+	 * We could be more aggressive about this, but that might risk performing
+	 * consolidation only to end up splitting again shortly thereafter.  Since
+	 * the btree should be very small compared to the space under management,
+	 * our goal isn't so much to ensure that it always occupies the absolutely
+	 * smallest possible number of pages as to reclaim pages before things get
+	 * too egregiously out of hand.
+	 */
+	if (btp->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+		max = FPM_ITEMS_PER_LEAF_PAGE;
+	else
+	{
+		Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+		max = FPM_ITEMS_PER_INTERNAL_PAGE;
+	}
+	if (btp->hdr.nused >= max / 3)
+		return;
+
+	/*
+	 * If we can fit our right sibling's keys onto this page, consolidate.
+	 */
+	np = FreePageBtreeFindRightSibling(base, btp);
+	if (np != NULL && btp->hdr.nused + np->hdr.nused <= max)
+	{
+		if (btp->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+		{
+			memcpy(&btp->u.leaf_key[btp->hdr.nused], &np->u.leaf_key[0],
+				   sizeof(FreePageBtreeLeafKey) * np->hdr.nused);
+			btp->hdr.nused += np->hdr.nused;
+		}
+		else
+		{
+			memcpy(&btp->u.internal_key[btp->hdr.nused], &np->u.internal_key[0],
+				   sizeof(FreePageBtreeInternalKey) * np->hdr.nused);
+			btp->hdr.nused += np->hdr.nused;
+			FreePageBtreeUpdateParentPointers(base, btp);
+		}
+		FreePageBtreeRemovePage(fpm, np);
+		return;
+	}
+
+	/*
+	 * If we can fit our keys onto our left sibling's page, consolidate.
+	 * In this case, we move our keys onto the other page rather than visca
+	 * versa, to avoid having to adjust ancestor keys.
+	 */
+	np = FreePageBtreeFindLeftSibling(base, btp);
+	if (np != NULL && btp->hdr.nused + np->hdr.nused <= max)
+	{
+		if (btp->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+		{
+			memcpy(&np->u.leaf_key[np->hdr.nused], &btp->u.leaf_key[0],
+				   sizeof(FreePageBtreeLeafKey) * btp->hdr.nused);
+			np->hdr.nused += btp->hdr.nused;
+		}
+		else
+		{
+			memcpy(&np->u.internal_key[np->hdr.nused], &btp->u.internal_key[0],
+				   sizeof(FreePageBtreeInternalKey) * btp->hdr.nused);
+			np->hdr.nused += btp->hdr.nused;
+			FreePageBtreeUpdateParentPointers(base, np);
+		}
+		np->hdr.nused += btp->hdr.nused;
+		FreePageBtreeRemovePage(fpm, btp);
+		return;
+	}
+}
+
+/*
+ * Find the passed page's left sibling; that is, the page at the same level
+ * of the tree whose keyspace immediately precedes ours.
+ */
+static FreePageBtree *
+FreePageBtreeFindLeftSibling(char *base, FreePageBtree *btp)
+{
+	FreePageBtree *p = btp;
+	int		levels = 0;
+
+	/* Move up until we can move left. */
+	for (;;)
+	{
+		Size	first_page;
+		Size	index;
+
+		first_page = FreePageBtreeFirstKey(p);
+		p = relptr_access(base, p->hdr.parent);
+
+		if (p == NULL)
+			return NULL;		/* we were passed the rightmost page */
+
+		index = FreePageBtreeSearchInternal(p, first_page);
+		if (index > 0)
+		{
+			Assert(p->u.internal_key[index].first_page == first_page);
+			p = relptr_access(base, p->u.internal_key[index - 1].child);
+			break;
+		}
+		Assert(index == 0);
+		++levels;
+	}
+
+	/* Descend left. */
+	while (levels > 0)
+	{
+		Assert(p->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+		p = relptr_access(base, p->u.internal_key[p->hdr.nused - 1].child);
+		--levels;
+	}
+	Assert(p->hdr.magic == btp->hdr.magic);
+
+	return p;
+}
+
+/*
+ * Find the passed page's right sibling; that is, the page at the same level
+ * of the tree whose keyspace immediately follows ours.
+ */
+static FreePageBtree *
+FreePageBtreeFindRightSibling(char *base, FreePageBtree *btp)
+{
+	FreePageBtree *p = btp;
+	int		levels = 0;
+
+	/* Move up until we can move right. */
+	for (;;)
+	{
+		Size	first_page;
+		Size	index;
+
+		first_page = FreePageBtreeFirstKey(p);
+		p = relptr_access(base, p->hdr.parent);
+
+		if (p == NULL)
+			return NULL;		/* we were passed the rightmost page */
+
+		index = FreePageBtreeSearchInternal(p, first_page);
+		if (index < p->hdr.nused - 1)
+		{
+			Assert(p->u.internal_key[index].first_page == first_page);
+			p = relptr_access(base, p->u.internal_key[index + 1].child);
+			break;
+		}
+		Assert(index == p->hdr.nused - 1);
+		++levels;
+	}
+
+	/* Descend left. */
+	while (levels > 0)
+	{
+		Assert(p->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+		p = relptr_access(base, p->u.internal_key[0].child);
+		--levels;
+	}
+	Assert(p->hdr.magic == btp->hdr.magic);
+
+	return p;
+}
+
+/*
+ * Get the first key on a btree page.
+ */
+static Size
+FreePageBtreeFirstKey(FreePageBtree *btp)
+{
+	Assert(btp->hdr.nused > 0);
+
+	if (btp->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+		return btp->u.leaf_key[0].first_page;
+	else
+	{
+		Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+		return btp->u.internal_key[0].first_page;
+	}
+}
+
+/*
+ * Get a page from the btree recycle list for use as a btree page.
+ */
+static FreePageBtree *
+FreePageBtreeGetRecycled(FreePageManager *fpm)
+{
+	char *base = fpm_segment_base(fpm);
+	FreePageSpanLeader *victim = relptr_access(base, fpm->btree_recycle);
+	FreePageSpanLeader *newhead;
+
+	Assert(victim != NULL);
+	newhead = relptr_access(base, victim->next);
+	if (newhead != NULL)
+		relptr_copy(newhead->prev, victim->prev);
+	relptr_store(base, fpm->btree_recycle, newhead);
+	Assert(fpm_pointer_is_page_aligned(base, victim));
+	fpm->btree_recycle_count--;
+	return (FreePageBtree *) victim;
+}
+
+/*
+ * Insert an item into an internal page.
+ */
+static void
+FreePageBtreeInsertInternal(char *base, FreePageBtree *btp, Size index,
+							Size first_page, FreePageBtree *child)
+{
+	Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+	Assert(btp->hdr.nused <= FPM_ITEMS_PER_INTERNAL_PAGE);
+	Assert(index <= btp->hdr.nused);
+	memmove(&btp->u.internal_key[index + 1], &btp->u.internal_key[index],
+			sizeof(FreePageBtreeInternalKey) * (btp->hdr.nused - index));
+	btp->u.internal_key[index].first_page = first_page;
+	relptr_store(base, btp->u.internal_key[index].child, child);
+	++btp->hdr.nused;
+}
+
+/*
+ * Insert an item into a leaf page.
+ */
+static void
+FreePageBtreeInsertLeaf(FreePageBtree *btp, Size index, Size first_page,
+						Size npages)
+{
+	Assert(btp->hdr.magic == FREE_PAGE_LEAF_MAGIC);
+	Assert(btp->hdr.nused <= FPM_ITEMS_PER_LEAF_PAGE);
+	Assert(index <= btp->hdr.nused);
+	memmove(&btp->u.leaf_key[index + 1], &btp->u.leaf_key[index],
+			sizeof(FreePageBtreeLeafKey) * (btp->hdr.nused - index));
+	btp->u.leaf_key[index].first_page = first_page;
+	btp->u.leaf_key[index].npages = npages;
+	++btp->hdr.nused;
+}
+
+/*
+ * Put a page on the btree recycle list.
+ */
+static void
+FreePageBtreeRecycle(FreePageManager *fpm, Size pageno)
+{
+	char *base = fpm_segment_base(fpm);
+	FreePageSpanLeader *head = relptr_access(base, fpm->btree_recycle);
+	FreePageSpanLeader *span;
+
+	span = (FreePageSpanLeader *) fpm_page_to_pointer(base, pageno);
+	span->magic = FREE_PAGE_SPAN_LEADER_MAGIC;
+	span->npages = 1;
+	relptr_store(base, span->next, head);
+	relptr_store(base, span->prev, (FreePageSpanLeader *) NULL);
+	if (head != NULL)
+		relptr_store(base, head->prev, span);
+	relptr_store(base, fpm->btree_recycle, span);
+	fpm->btree_recycle_count++;
+}
+
+/*
+ * Remove an item from the btree at the given position on the given page.
+ */
+static void
+FreePageBtreeRemove(FreePageManager *fpm, FreePageBtree *btp, Size index)
+{
+	Assert(btp->hdr.magic == FREE_PAGE_LEAF_MAGIC);
+	Assert(index < btp->hdr.nused);
+
+	/* When last item is removed, extirpate entire page from btree. */
+	if (btp->hdr.nused == 1)
+	{
+		FreePageBtreeRemovePage(fpm, btp);
+		return;
+	}
+
+	/* Physically remove the key from the page. */
+	--btp->hdr.nused;
+	if (index < btp->hdr.nused)
+		memmove(&btp->u.leaf_key[index], &btp->u.leaf_key[index + 1],
+				sizeof(FreePageBtreeLeafKey) * (btp->hdr.nused - index));
+
+	/* If we just removed the first key, adjust ancestor keys. */
+	if (index == 0)
+		FreePageBtreeAdjustAncestorKeys(fpm, btp);
+
+	/* Consider whether to consolidate this page with a sibling. */
+	FreePageBtreeConsolidate(fpm, btp);
+}
+
+/*
+ * Remove a page from the btree.  Caller is responsible for having relocated
+ * any keys from this page that are still wanted.  The page is placed on the
+ * recycled list.
+ */
+static void
+FreePageBtreeRemovePage(FreePageManager *fpm, FreePageBtree *btp)
+{
+	char *base = fpm_segment_base(fpm);
+	FreePageBtree *parent;
+	Size	index;
+	Size	first_page;
+
+	for (;;)
+	{
+		/* Find parent page. */
+		parent = relptr_access(base, btp->hdr.parent);
+		if (parent == NULL)
+		{
+			/* We are removing the root page. */
+			relptr_store(base, fpm->btree_root, (FreePageBtree *) NULL);
+			fpm->btree_depth = 0;
+			Assert(fpm->singleton_first_page == 0);
+			Assert(fpm->singleton_npages == 0);
+			return;
+		}
+
+		/*
+		 * If the parent contains only one item, we need to remove it as
+		 * well.
+		 */
+		if (parent->hdr.nused > 1)
+			break;
+		FreePageBtreeRecycle(fpm, fpm_pointer_to_page(base, btp));
+		btp = parent;
+	}
+
+	/* Find and remove the downlink. */
+	first_page = FreePageBtreeFirstKey(btp);
+	if (parent->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+	{
+		index = FreePageBtreeSearchLeaf(parent, first_page);
+		Assert(index < parent->hdr.nused);
+		if (index < parent->hdr.nused - 1)
+			memmove(&parent->u.leaf_key[index],
+					&parent->u.leaf_key[index + 1],
+					sizeof(FreePageBtreeLeafKey)
+						* (parent->hdr.nused - index - 1));
+	}
+	else
+	{
+		index = FreePageBtreeSearchInternal(parent, first_page);
+		Assert(index < parent->hdr.nused);
+		if (index < parent->hdr.nused - 1)
+			memmove(&parent->u.internal_key[index],
+					&parent->u.internal_key[index + 1],
+					sizeof(FreePageBtreeInternalKey)
+					* (parent->hdr.nused - index - 1));
+	}
+	parent->hdr.nused--;
+	Assert(parent->hdr.nused > 0);
+
+	/* Recycle the page. */
+	FreePageBtreeRecycle(fpm, fpm_pointer_to_page(base, btp));
+
+	/* Adjust ancestor keys if needed. */
+	if (index == 0)
+		FreePageBtreeAdjustAncestorKeys(fpm, parent);
+
+	/* Consider whether to consolidate the parent with a sibling. */
+	FreePageBtreeConsolidate(fpm, parent);
+}
+
+/*
+ * Search the btree for an entry for the given first page and initialize
+ * *result with the results of the search.  result->page and result->index
+ * indicate either the position of an exact match or the position at which
+ * the new key should be inserted.  result->found is true for an exact match,
+ * otherwise false.  result->split_pages will contain the number of additional
+ * btree pages that will be needed when performing a split to insert a key.
+ * Except as described above, the contents of fields in the result object are
+ * undefined on return.
+ */
+static void
+FreePageBtreeSearch(FreePageManager *fpm, Size first_page,
+					FreePageBtreeSearchResult *result)
+{
+	char *base = fpm_segment_base(fpm);
+	FreePageBtree *btp = relptr_access(base, fpm->btree_root);
+	Size	index;
+
+	result->split_pages = 1;
+
+	/* If the btree is empty, there's nothing to find. */
+	if (btp == NULL)
+	{
+		result->page = NULL;
+		result->found = false;
+		return;
+	}
+
+	/* Descend until we hit a leaf. */
+	while (btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC)
+	{
+		FreePageBtree *child;
+
+		index = FreePageBtreeSearchInternal(btp, first_page);
+
+		/*
+		 * If the index is 0, we're not going to find it, but we keep
+		 * descending anyway so that we can find the insertion point.
+		 */
+		if (index > 0)
+			--index;
+
+		/* Track required split depth for leaf insert. */
+		if (btp->hdr.nused >= FPM_ITEMS_PER_INTERNAL_PAGE)
+		{
+			Assert(btp->hdr.nused == FPM_ITEMS_PER_INTERNAL_PAGE);
+			result->split_pages++;
+		}
+		else
+			result->split_pages = 0;
+
+		/* Descend to appropriate child page. */
+		Assert(index < btp->hdr.nused);
+		child = relptr_access(base, btp->u.internal_key[index].child);
+		Assert(relptr_access(base, child->hdr.parent) == btp);
+		btp = child;
+	}
+
+	/* Track required split depth for leaf insert. */
+	if (btp->hdr.nused >= FPM_ITEMS_PER_LEAF_PAGE)
+	{
+		Assert(btp->hdr.nused == FPM_ITEMS_PER_INTERNAL_PAGE);
+		result->split_pages++;
+	}
+	else
+		result->split_pages = 0;
+
+	/* Search leaf page. */
+	index = FreePageBtreeSearchLeaf(btp, first_page);
+
+	/* Assemble results. */
+	result->page = btp;
+	result->index = index;
+	result->found = index < btp->hdr.nused &&
+		first_page == btp->u.leaf_key[index].first_page;
+}
+
+/*
+ * Search an internal page for the first key greater than or equal to a given
+ * page number.  Returns the index of that key, or one greater than the number
+ * of keys on the page if none.
+ */
+static Size
+FreePageBtreeSearchInternal(FreePageBtree *btp, Size first_page)
+{
+	Size	low = 0;
+	Size	high = btp->hdr.nused;
+
+	Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+	Assert(high > 0 && high <= FPM_ITEMS_PER_INTERNAL_PAGE);
+
+	while (low < high)
+	{
+		Size	mid = (low + high) / 2;
+		Size	val = btp->u.internal_key[mid].first_page;
+
+		if (first_page == val)
+			return mid;
+		else if (first_page < val)
+			high = mid;
+		else
+			low = mid + 1;
+	}
+
+	return low;
+}
+
+/*
+ * Search a leaf page for the first key greater than or equal to a given
+ * page number.  Returns the index of that key, or one greater than the number
+ * of keys on the page if none.
+ */
+static Size
+FreePageBtreeSearchLeaf(FreePageBtree *btp, Size first_page)
+{
+	Size	low = 0;
+	Size	high = btp->hdr.nused;
+
+	Assert(btp->hdr.magic == FREE_PAGE_LEAF_MAGIC);
+	Assert(high > 0 && high <= FPM_ITEMS_PER_LEAF_PAGE);
+
+	while (low < high)
+	{
+		Size	mid = (low + high) / 2;
+		Size	val = btp->u.leaf_key[mid].first_page;
+
+		if (first_page == val)
+			return mid;
+		else if (first_page < val)
+			high = mid;
+		else
+			low = mid + 1;
+	}
+
+	return low;
+}
+
+/*
+ * Allocate a new btree page and move half the keys from the provided page
+ * to the new page.  Caller is responsible for making sure that there's a
+ * page available from fpm->btree_recycle.  Returns a pointer to the new page,
+ * to which caller must add a downlink.
+ */
+static FreePageBtree *
+FreePageBtreeSplitPage(FreePageManager *fpm, FreePageBtree *btp)
+{
+	FreePageBtree *newsibling;
+
+	newsibling = FreePageBtreeGetRecycled(fpm);
+	newsibling->hdr.magic = btp->hdr.magic;
+	newsibling->hdr.nused = btp->hdr.nused / 2;
+	relptr_copy(newsibling->hdr.parent, btp->hdr.parent);
+	btp->hdr.nused -= newsibling->hdr.nused;
+
+	if (btp->hdr.magic == FREE_PAGE_LEAF_MAGIC)
+		memcpy(&newsibling->u.leaf_key,
+			   &btp->u.leaf_key[btp->hdr.nused],
+			   sizeof(FreePageBtreeLeafKey) * newsibling->hdr.nused);
+	else
+	{
+		Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+		memcpy(&newsibling->u.internal_key,
+			   &btp->u.internal_key[btp->hdr.nused],
+			   sizeof(FreePageBtreeInternalKey) * newsibling->hdr.nused);
+		FreePageBtreeUpdateParentPointers(fpm_segment_base(fpm), newsibling);
+	}
+
+	return newsibling;
+}
+
+/*
+ * When internal pages are split or merged, the parent pointers of their
+ * children must be updated.
+ */
+static void
+FreePageBtreeUpdateParentPointers(char *base, FreePageBtree *btp)
+{
+	Size	i;
+
+	Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
+	for (i = 0; i < btp->hdr.nused; ++i)
+	{
+		FreePageBtree *child;
+
+		child = relptr_access(base, btp->u.internal_key[i].child);
+		relptr_store(base, child->hdr.parent, btp);
+	}
+}	
+
+/*
+ * Debugging dump of btree data.
+ */
+static void
+FreePageManagerDumpBtree(FreePageManager *fpm, FreePageBtree *btp,
+						 FreePageBtree *parent, int level, StringInfo buf)
+{
+	char   *base = fpm_segment_base(fpm);
+	Size	pageno = fpm_pointer_to_page(base, btp);
+	Size	index;
+	FreePageBtree *check_parent;
+
+	check_stack_depth();
+	check_parent = relptr_access(base, btp->hdr.parent);
+	appendStringInfo(buf, "  %zu@%d %c", pageno, level,
+					 btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC ? 'i' : 'l');
+	if (parent != check_parent)
+		appendStringInfo(buf, " [actual parent %zu, expected %zu]",
+						 fpm_pointer_to_page(base, check_parent),
+						 fpm_pointer_to_page(base, parent));
+	appendStringInfoChar(buf, ':');
+	for (index = 0; index < btp->hdr.nused; ++index)
+	{
+		if (btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC)
+			appendStringInfo(buf, " %zu->%zu",
+				 btp->u.internal_key[index].first_page,
+				 btp->u.internal_key[index].child.relptr_off / FPM_PAGE_SIZE);
+		else
+			appendStringInfo(buf, " %zu(%zu)",
+				 btp->u.leaf_key[index].first_page,
+				 btp->u.leaf_key[index].npages);
+	}
+	appendStringInfo(buf, "\n");
+
+	if (btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC)
+	{
+		for (index = 0; index < btp->hdr.nused; ++index)
+		{
+			FreePageBtree *child;
+
+			child = relptr_access(base, btp->u.internal_key[index].child);
+			FreePageManagerDumpBtree(fpm, child, btp, level + 1, buf);
+		}
+	}
+}
+
+/*
+ * Debugging dump of free-span data.
+ */
+static void
+FreePageManagerDumpSpans(FreePageManager *fpm, FreePageSpanLeader *span,
+						 Size expected_pages, StringInfo buf)
+{
+	char   *base = fpm_segment_base(fpm);
+
+	while (span != NULL)
+	{
+		if (span->npages != expected_pages)
+			appendStringInfo(buf, " %zu(%zu)", fpm_pointer_to_page(base, span),
+							 span->npages);
+		else
+			appendStringInfo(buf, " %zu", fpm_pointer_to_page(base, span));
+		span = relptr_access(base, span->next);
+	}
+
+	appendStringInfo(buf, "\n");
+}
+
+/*
+ * Like FreePageManagerGet, this function allocates a run of pages of the
+ * given length from the free page manager, but without taking and releasing
+ * the lock.  The caller is responsible for making sure the lock is already
+ * held.
+ */
+static bool
+FreePageManagerGetInternal(FreePageManager *fpm, Size npages, Size *first_page)
+{
+	char *base = fpm_segment_base(fpm);
+	FreePageSpanLeader *victim = NULL;
+	FreePageSpanLeader *prev;
+	FreePageSpanLeader *next;
+	FreePageBtreeSearchResult result;
+	Size	victim_page = 0;		/* placate compiler */
+	Size	f;
+
+	/*
+	 * Search for a free span.
+	 *
+	 * Right now, we use a simple best-fit policy here, but it's possible for
+	 * this to result in memory fragmentation if we're repeatedly asked to
+	 * allocate chunks just a little smaller than what we have available.
+	 * Hopefully, this is unlikely, because we expect most requests to be
+	 * single pages or superblock-sized chunks -- but no policy can be optimal
+	 * under all circumstances unless it has knowledge of future allocation
+	 * patterns.
+	 */
+	for (f = Min(npages, FPM_NUM_FREELISTS) - 1; f < FPM_NUM_FREELISTS; ++f)
+	{
+		/* Skip empty freelists. */
+		if (relptr_is_null(fpm->freelist[f]))
+			continue;
+
+		/*
+		 * All of the freelists except the last one contain only items of a
+		 * single size, so we just take the first one.  But the final free
+		 * list contains everything too big for any of the other lists, so
+		 * we need to search the list.
+		 */
+		if (f < FPM_NUM_FREELISTS - 1)
+			victim = relptr_access(base, fpm->freelist[f]);
+		else
+		{
+			FreePageSpanLeader *candidate;
+
+			candidate = relptr_access(base, fpm->freelist[f]);
+			do
+			{
+				if (candidate->npages >= npages && (victim == NULL ||
+					victim->npages > candidate->npages))
+				{
+					victim = candidate;
+					if (victim->npages == npages)
+						break;
+				}
+				candidate = relptr_access(base, candidate->next);
+			} while (candidate != NULL);
+		}
+		break;
+	}
+
+	/* If we didn't find an allocatable span, return failure. */
+	if (victim == NULL)
+		return false;
+
+	/* Remove span from free list. */
+	Assert(victim->magic == FREE_PAGE_SPAN_LEADER_MAGIC);
+	prev = relptr_access(base, victim->prev);
+	next = relptr_access(base, victim->next);
+	if (prev != NULL)
+		relptr_copy(prev->next, victim->next);
+	else
+		relptr_copy(fpm->freelist[f], victim->next);
+	if (next != NULL)
+		relptr_copy(next->prev, victim->prev);
+	victim_page = fpm_pointer_to_page(base, victim);
+
+	/*
+	 * If we haven't initialized the btree yet, the victim must be the single
+	 * span stored within the FreePageManager itself.  Otherwise, we need
+	 * to update the btree.
+	 */
+	if (relptr_is_null(fpm->btree_root))
+	{
+		Assert(victim_page == fpm->singleton_first_page);
+		Assert(victim->npages = fpm->singleton_npages);
+		Assert(victim->npages >= npages);
+		fpm->singleton_first_page += npages;
+		fpm->singleton_npages -= npages;
+		if (fpm->singleton_npages > 0)
+			FreePagePushSpanLeader(fpm, fpm->singleton_first_page,
+								   fpm->singleton_npages);
+	}
+	else
+	{
+		/*
+		 * If the span we found is exactly the right size, remove it from the
+		 * btree completely.  Otherwise, adjust the btree entry to reflect the
+		 * still-unallocated portion of the span, and put that portion on the
+		 * appropriate free list.
+		 */
+		FreePageBtreeSearch(fpm, victim_page, &result);
+		Assert(result.found);
+		if (victim->npages == npages)
+			FreePageBtreeRemove(fpm, result.page, result.index);
+		else
+		{
+			FreePageBtreeLeafKey *key;
+
+			/* Adjust btree to reflect remaining pages. */
+			Assert(victim->npages > npages);
+			key = &result.page->u.leaf_key[result.index];
+			Assert(key->npages == victim->npages);
+			key->first_page += npages;
+			key->npages -= npages;
+			if (result.index == 0)
+				FreePageBtreeAdjustAncestorKeys(fpm, result.page);
+
+			/* Put the unallocated pages back on the appropriate free list. */
+			FreePagePushSpanLeader(fpm, victim_page + npages,
+								   victim->npages - npages);
+		}
+	}
+
+	/* Return results to caller. */
+	*first_page = fpm_pointer_to_page(base, victim);
+	return true;
+}
+
+/*
+ * Put a range of pages into the btree and freelists, consolidating it with
+ * existing free spans just before and/or after it.  If 'soft' is true,
+ * only perform the insertion if it can be done without allocating new btree
+ * pages; if false, do it always.  Returns 0 if the soft flag caused the
+ * insertion to be skipped, or otherwise the size of the contiguous span
+ * created by the insertion.  This may be larger than npages if we're able
+ * to consolidate with an adjacent range.
+ */
+static Size
+FreePageManagerPutInternal(FreePageManager *fpm, Size first_page, Size npages,
+						   bool soft)
+{
+	char *base = fpm_segment_base(fpm);
+	FreePageBtreeSearchResult result;
+	FreePageBtreeLeafKey *prevkey = NULL;
+	FreePageBtreeLeafKey *nextkey = NULL;
+	FreePageBtree *np;
+	Size	nindex;
+	Assert(npages > 0);
+
+	/* We can store a single free span without initializing the btree. */
+	if (fpm->btree_depth == 0)
+	{
+		if (fpm->singleton_npages == 0)
+		{
+			/* Don't have a span yet; store this one. */
+			fpm->singleton_first_page = first_page;
+			fpm->singleton_npages = npages;
+			FreePagePushSpanLeader(fpm, first_page, npages);
+			return fpm->singleton_npages;
+		}
+		else if (fpm->singleton_first_page + fpm->singleton_npages ==
+					first_page)
+		{
+			/* New span immediately follows sole existing span. */
+			fpm->singleton_npages += npages;
+			FreePagePopSpanLeader(fpm, fpm->singleton_first_page);
+			FreePagePushSpanLeader(fpm, fpm->singleton_first_page,
+								   fpm->singleton_npages);
+			return fpm->singleton_npages;
+		}
+		else if (first_page + npages == fpm->singleton_first_page)
+		{
+			/* New span immediately precedes sole existing span. */
+			FreePagePopSpanLeader(fpm, fpm->singleton_first_page);
+			fpm->singleton_first_page = first_page;
+			fpm->singleton_npages += npages;
+			FreePagePushSpanLeader(fpm, fpm->singleton_first_page,
+								   fpm->singleton_npages);
+			return fpm->singleton_npages;
+		}
+		else
+		{
+			/* Not contiguous; we need to initialize the btree. */
+			Size	root_page;
+			FreePageBtree *root;
+
+			if (!relptr_is_null(fpm->btree_recycle))
+				root = FreePageBtreeGetRecycled(fpm);
+			else if (FreePageManagerGetInternal(fpm, 1, &root_page))
+				root = (FreePageBtree *) fpm_page_to_pointer(base, root_page);
+			else
+			{
+				/* We'd better be able to get a page from the existing range. */
+				elog(FATAL, "free page manager btree is corrupt");
+			}
+
+			/* Create the btree and move the preexisting range into it. */
+			root->hdr.magic = FREE_PAGE_LEAF_MAGIC;
+			root->hdr.nused = 1;
+			relptr_store(base, root->hdr.parent, (FreePageBtree *) NULL);
+			root->u.leaf_key[0].first_page = fpm->singleton_first_page;
+			root->u.leaf_key[0].npages = fpm->singleton_npages;
+			relptr_store(base, fpm->btree_root, root);
+			fpm->singleton_first_page = 0;
+			fpm->singleton_npages = 0;
+			fpm->btree_depth = 1;
+
+			/*
+			 * Corner case: it may be that the btree root took the very last
+			 * free page.  In that case, the sole btree entry covers a zero
+			 * page run, which is invalid.  Overwrite it with the entry we're
+			 * trying to insert and get out.
+			 */
+			if (root->u.leaf_key[0].npages == 0)
+			{
+				root->u.leaf_key[0].first_page = first_page;
+				root->u.leaf_key[0].npages = npages;
+				return npages;
+			}
+
+			/* Fall through to insert the new key. */
+		}
+	}
+
+	/* Search the btree. */
+	FreePageBtreeSearch(fpm, first_page, &result);
+	Assert(!result.found);
+	if (result.index > 0)
+		prevkey = &result.page->u.leaf_key[result.index - 1];
+	if (result.index < result.page->hdr.nused)
+	{
+		np = result.page;
+		nindex = result.index;
+		nextkey = &result.page->u.leaf_key[result.index];
+	}
+	else
+	{
+		np = FreePageBtreeFindRightSibling(base, result.page);
+		nindex = 0;
+		if (np != NULL)
+			nextkey = &np->u.leaf_key[0];
+	}
+
+	/* Consolidate with the previous entry if possible. */
+	if (prevkey != NULL && prevkey->first_page + prevkey->npages >= first_page)
+	{
+		bool	remove_next = false;
+		Size	result;
+
+		Assert(prevkey->first_page + prevkey->npages == first_page);
+		prevkey->npages = (first_page - prevkey->first_page) + npages;
+
+		/* Check whether we can *also* consolidate with the following entry. */
+		if (nextkey != NULL &&
+			prevkey->first_page + prevkey->npages >= nextkey->first_page)
+		{
+			Assert(prevkey->first_page + prevkey->npages ==
+					nextkey->first_page);
+			prevkey->npages = (nextkey->first_page - prevkey->first_page)
+				+ nextkey->npages;
+			FreePagePopSpanLeader(fpm, nextkey->first_page);
+			remove_next = true;
+		}
+
+		/* Put the span on the correct freelist and save size. */
+		FreePagePopSpanLeader(fpm, prevkey->first_page);
+		FreePagePushSpanLeader(fpm, prevkey->first_page, prevkey->npages);
+		result = prevkey->npages;
+
+		/*
+		 * If we consolidated with both the preceding and following entries,
+		 * we must remove the following entry.  We do this last, because
+		 * removing an element from the btree may invalidate pointers we hold
+		 * into the current data structure.
+		 *
+		 * NB: The btree is technically in an invalid state a this point
+		 * because we've already updated prevkey to cover the same key space
+		 * as nextkey.  FreePageBtreeRemove() shouldn't notice that, though.
+		 */
+		if (remove_next)
+			FreePageBtreeRemove(fpm, np, nindex);
+
+		return result;
+	}
+
+	/* Consolidate with the next entry if possible. */
+	if (nextkey != NULL && first_page + npages >= nextkey->first_page)
+	{
+		Size	newpages;
+
+		/* Compute new size for span. */
+		Assert(first_page + npages == nextkey->first_page);
+		newpages = (nextkey->first_page - first_page) + nextkey->npages;
+
+		/* Put span on correct free list. */
+		FreePagePopSpanLeader(fpm, nextkey->first_page);
+		FreePagePushSpanLeader(fpm, first_page, newpages);
+
+		/* Update key in place. */
+		nextkey->first_page = first_page;
+		nextkey->npages = newpages;
+
+		/* If reducing first key on page, ancestors might need adjustment. */
+		if (nindex == 0)
+			FreePageBtreeAdjustAncestorKeys(fpm, np);
+
+		return nextkey->npages;
+	}
+
+	/* Split leaf page and as many of its ancestors as necessary. */
+	if (result.split_pages > 0)
+	{
+		/*
+		 * NB: We could consider various coping strategies here to avoid a
+		 * split; most obviously, if np != result.page, we could target that
+		 * page instead.   More complicated shuffling strategies could be
+		 * possible as well; basically, unless every single leaf page is 100%
+		 * full, we can jam this key in there if we try hard enough.  It's
+		 * unlikely that trying that hard is worthwhile, but it's possible
+		 * we might need to make more than no effort.  For now, we just do
+		 * the easy thing, which is nothing.
+		 */
+
+		/* If this is a soft insert, it's time to give up. */
+		if (soft)
+			return 0;
+
+		/* Check whether we need to allocate more btree pages to split. */
+		if (result.split_pages > fpm->btree_recycle_count)
+		{
+			Size	pages_needed;
+			Size	recycle_page;
+			Size	i;
+
+			/*
+			 * Allocate the required number of pages and split each one in
+			 * turn.  This should never fail, because if we've got enough spans
+			 * of free pages kicking around that we need additional storage
+			 * space just to remember them all, then we should certainly have
+			 * enough to expand the btree, which should only ever use a tiny
+			 * number of pages compared to the number under management.  If
+			 * it does, something's badly screwed up.
+			 */
+			pages_needed = result.split_pages - fpm->btree_recycle_count;
+			for (i = 0; i < pages_needed; ++i)
+			{
+				if (!FreePageManagerGetInternal(fpm, 1, &recycle_page))
+					elog(FATAL, "free page manager btree is corrupt");
+				FreePageBtreeRecycle(fpm, recycle_page);
+			}
+
+			/*
+			 * The act of allocating pages to recycle may have invalidated
+			 * the results of our previous btree reserch, so repeat it.
+			 * (We could recheck whether any of our split-avoidance strategies
+			 * that were not viable before now are, but it hardly seems
+			 * worthwhile, so we don't bother. Consolidation can't be possible
+			 * now if it wasn't previously.)
+			 */
+			FreePageBtreeSearch(fpm, first_page, &result);
+
+			/*
+			 * The act of allocating pages for use in constructing our btree
+			 * should never cause any page to become more full, so the new
+			 * split depth should be no greater than the old one, and perhaps
+			 * less if we fortutiously allocated a chunk that freed up a
+			 * slot on the page we need to update.
+			 */
+			Assert(result.split_pages <= fpm->btree_recycle_count);
+		}
+
+		/* If we still need to perform a split, do it. */
+		if (result.split_pages > 0)
+		{
+			FreePageBtree	*split_target = result.page;
+			FreePageBtree   *child = NULL;
+			Size	key = first_page;
+
+			for (;;)
+			{
+				FreePageBtree *newsibling;
+				FreePageBtree *parent;
+
+				/* Identify parent page, which must receive downlink. */
+				parent = relptr_access(base, split_target->hdr.parent);
+
+				/* Split the page - downlink not added yet. */
+				newsibling = FreePageBtreeSplitPage(fpm, split_target);
+
+				/*
+				 * At this point in the loop, we're always carrying a pending
+				 * insertion.  On the first pass, it's the actual key we're
+				 * trying to insert; on subsequent passes, it's the downlink
+				 * that needs to be added as a result of the split performed
+				 * during the previous loop iteration.  Since we've just split
+				 * the page, there's definitely room on one of the two
+				 * resulting pages.
+				 */
+				if (child == NULL)
+				{
+					Size	index;
+					FreePageBtree *insert_into;
+
+					insert_into = key < newsibling->u.leaf_key[0].first_page ?
+						split_target : newsibling;
+					index = FreePageBtreeSearchLeaf(insert_into, key);
+					FreePageBtreeInsertLeaf(insert_into, index, key, npages);
+					if (index == 0 && insert_into == split_target)
+						FreePageBtreeAdjustAncestorKeys(fpm, split_target);
+				}
+				else
+				{
+					Size	index;
+					FreePageBtree *insert_into;
+
+					insert_into =
+						key < newsibling->u.internal_key[0].first_page ?
+						split_target : newsibling;
+					index = FreePageBtreeSearchInternal(insert_into, key);
+					FreePageBtreeInsertInternal(base, insert_into, index,
+												key, child);
+					relptr_store(base, child->hdr.parent, insert_into);
+					if (index == 0 && insert_into == split_target)
+						FreePageBtreeAdjustAncestorKeys(fpm, split_target);
+				}
+
+				/* If the page we just split has no parent, split the root. */
+				if (parent == NULL)
+				{
+					FreePageBtree *newroot;
+
+					newroot = FreePageBtreeGetRecycled(fpm);
+					newroot->hdr.magic = FREE_PAGE_INTERNAL_MAGIC;
+					newroot->hdr.nused = 2;
+					relptr_store(base, newroot->hdr.parent,
+								 (FreePageBtree *) NULL);
+					newroot->u.internal_key[0].first_page =
+						FreePageBtreeFirstKey(split_target);
+					relptr_store(base, newroot->u.internal_key[0].child,
+						split_target);
+					relptr_store(base, split_target->hdr.parent, newroot);
+					newroot->u.internal_key[1].first_page =
+						FreePageBtreeFirstKey(newsibling);
+					relptr_store(base, newroot->u.internal_key[1].child,
+						newsibling);
+					relptr_store(base, newsibling->hdr.parent, newroot);
+					relptr_store(base, fpm->btree_root, newroot);
+					fpm->btree_depth++;
+
+					break;
+				}
+
+				/* If the parent page isn't full, insert the downlink. */
+				key = newsibling->u.internal_key[0].first_page;
+				if (parent->hdr.nused < FPM_ITEMS_PER_INTERNAL_PAGE)
+				{
+					Size	index;
+
+					index = FreePageBtreeSearchInternal(parent, key);
+					FreePageBtreeInsertInternal(base, parent, index,
+												key, newsibling);
+					relptr_store(base, newsibling->hdr.parent, parent);
+					if (index == 0)
+						FreePageBtreeAdjustAncestorKeys(fpm, parent);
+					break;
+				}
+
+				/* The parent also needs to be split, so loop around. */
+				child = newsibling;
+				split_target = parent;
+			}
+
+			/*
+			 * The loop above did the insert, so just need to update the
+			 * free list, and we're done.
+			 */
+			FreePagePushSpanLeader(fpm, first_page, npages);
+
+			return npages;
+		}
+	}
+
+	/* Physically add the key to the page. */
+	Assert(result.page->hdr.nused < FPM_ITEMS_PER_LEAF_PAGE);
+	FreePageBtreeInsertLeaf(result.page, result.index, first_page, npages);
+
+	/* If new first key on page, ancestors might need adjustment. */
+	if (result.index == 0)
+		FreePageBtreeAdjustAncestorKeys(fpm, result.page);
+
+	/* Put it on the free list. */
+	FreePagePushSpanLeader(fpm, first_page, npages);
+
+	return npages;
+}
+
+/*
+ * Remove a FreePageSpanLeader from the linked-list that contains it, either
+ * because we're changing the size of the span, or because we're allocating it.
+ */
+static void
+FreePagePopSpanLeader(FreePageManager *fpm, Size pageno)
+{
+	char *base = fpm_segment_base(fpm);
+	FreePageSpanLeader *span;
+	FreePageSpanLeader *next;
+	FreePageSpanLeader *prev;
+
+	span = (FreePageSpanLeader *) fpm_page_to_pointer(base, pageno);
+
+	next = relptr_access(base, span->next);
+	prev = relptr_access(base, span->prev);
+	if (next != NULL)
+		relptr_copy(next->prev, span->prev);
+	if (prev != NULL)
+		relptr_copy(prev->next, span->next);
+	else
+	{
+		Size	f = Min(span->npages, FPM_NUM_FREELISTS) - 1;
+
+		Assert(fpm->freelist[f].relptr_off == pageno * FPM_PAGE_SIZE);
+		relptr_copy(fpm->freelist[f], span->next);
+	}
+}
+
+/*
+ * Initialize a new FreePageSpanLeader and put it on the appropriate free list.
+ */
+static void
+FreePagePushSpanLeader(FreePageManager *fpm, Size first_page, Size npages)
+{
+	char   *base = fpm_segment_base(fpm);
+	Size	f = Min(npages, FPM_NUM_FREELISTS) - 1;
+	FreePageSpanLeader *head = relptr_access(base, fpm->freelist[f]);
+	FreePageSpanLeader *span;
+
+	span = (FreePageSpanLeader *) fpm_page_to_pointer(base, first_page);
+	span->magic = FREE_PAGE_SPAN_LEADER_MAGIC;
+	span->npages = npages;
+	relptr_store(base, span->next, head);
+	relptr_store(base, span->prev, (FreePageSpanLeader *) NULL);
+	if (head != NULL)
+		relptr_store(base, head->prev, span);
+	relptr_store(base, fpm->freelist[f], span);
+}
diff --git a/src/backend/utils/mmgr/sb_alloc.c b/src/backend/utils/mmgr/sb_alloc.c
new file mode 100644
index 0000000..ace9e56
--- /dev/null
+++ b/src/backend/utils/mmgr/sb_alloc.c
@@ -0,0 +1,861 @@
+/*-------------------------------------------------------------------------
+ *
+ * sb_alloc.c
+ *	  Superblock-based memory allocator.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/utils/mmgr/sb_alloc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "utils/sb_region.h"
+
+/*
+ * Metadata for an ordinary superblock, a large memory allocation, or a "span
+ * of spans".
+ *
+ * For ordinary superblocks and large memory allocations, span objects are
+ * stored out-of-line; that is, the span object is not stored within the
+ * span itself.  Ordinary superblocks are all of size SB_SUPERBLOCK_SIZE,
+ * and size_class indicates the size of object they contain.  Large memory
+ * spans contain just enough pages to store the object, and size_class
+ * is SB_SCLASS_SPAN_LARGE; ninitialized, nused, and firstfree are all unused,
+ * as the whole span consists of a single object.
+ * 
+ * For a "span of spans", the span object is stored "inline".  The allocation
+ * is always exactly one page, and the sb_span object is located at the
+ * beginning of that page.  This makes it easy to free a span: just find the
+ * start of the containing page, and there's the sb_span to which it needs to
+ * be returned.  The size class will be SB_SPAN_OF_SPANS, and the remaining
+ * fields are used just as they would be in an ordinary superblock.  We can't
+ * allocate spans out of ordinary superblocks because creating an ordinary
+ * superblock requires us to be able to allocate a span *first*.  Doing it
+ * this way avoids that circularity.
+ */
+struct sb_span
+{
+	relptr(sb_heap) parent;		/* Containing heap. */
+	relptr(sb_span) prevspan;	/* Previous span. */
+	relptr(sb_span) nextspan;	/* Next span. */
+	relptr(char)	start;		/* Starting address. */
+	Size		npages;			/* Length of span in pages. */
+	uint16		size_class;		/* Size class. */
+	uint16		ninitialized;	/* Maximum number of objects ever allocated. */
+	uint16		nallocatable;	/* Number of objects currently allocatable. */
+	uint16		firstfree;		/* First object on free list. */
+	uint16		nmax;			/* Maximum number of objects ever possible. */
+	uint16		fclass;			/* Current fullness class. */
+};
+
+#define SB_SPAN_NOTHING_FREE		((uint16) -1)
+#define SB_SUPERBLOCK_SIZE			(SB_PAGES_PER_SUPERBLOCK * FPM_PAGE_SIZE)
+
+/*
+ * Small allocations are handled by dividing a relatively large chunk of
+ * memory called a superblock into many small objects of equal size.  The
+ * chunk sizes are defined by the following array.  Larger size classes are
+ * spaced more widely than smaller size classes.  We fudge the spacing for
+ * size classes >1k to avoid space wastage: based on the knowledge that we
+ * plan to allocate 64k superblocks, we bump the maximum object size up
+ * to the largest multiple of 8 bytes that still lets us fit the same
+ * number of objects into one superblock.
+ *
+ * NB: Because of this fudging, if the size of a superblock is ever changed,
+ * these size classes should be reworked to be optimal for the new size.
+ *
+ * NB: The optimal spacing for size classes, as well as the size of the
+ * superblocks themselves, is not a question that has one right answer.
+ * Some allocators (such as tcmalloc) use more closely-spaced size classes
+ * than we do here, while others (like aset.c) use more widely-spaced classes.
+ * Spacing the classes more closely avoids wasting memory within individual
+ * chunks, but also means a larger number of potentially-unfilled superblocks.
+ * This system is really only suitable for allocating relatively large amounts
+ * of memory, where the unfilled superblocks will be a small percentage of
+ * the total allocations.
+ */
+static const uint16 sb_size_classes[] = {
+	sizeof(sb_span), 0,				/* special size classes */
+	8, 16, 24, 32, 40, 48, 56, 64,	/* 8 classes separated by 8 bytes */
+	80, 96, 112, 128,				/* 4 classes separated by 16 bytes */
+	160, 192, 224, 256,				/* 4 classes separated by 32 bytes */
+	320, 384, 448, 512,				/* 4 classes separated by 64 bytes */
+	640, 768, 896, 1024,			/* 4 classes separated by 128 bytes */
+	1280, 1560, 1816, 2048,			/* 4 classes separated by ~256 bytes */
+	2616, 3120, 3640, 4096,			/* 4 classes separated by ~512 bytes */
+	5456, 6552, 7280, 8192			/* 4 classes separated by ~1024 bytes */
+};
+
+/*
+ * The following lookup table is used to map the size of small objects
+ * (less than 1kB) onto the corresponding size class.  To use this table,
+ * round the size of the object up to the next multiple of 8 bytes, and then
+ * index into this array.
+ */
+static char sb_size_class_map[] = {
+	2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 13, 13,
+	14, 14, 14, 14, 15, 15, 15, 15, 16, 16, 16, 16, 17, 17, 17, 17,
+	18, 18, 18, 18, 18, 18, 18, 18, 19, 19, 19, 19, 19, 19, 19, 19,
+	20, 20, 20, 20, 20, 20, 20, 20, 21, 21, 21, 21, 21, 21, 21, 21,
+	22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22,
+	23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23,
+	24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24,
+	25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25
+};
+#define SB_SIZE_CLASS_MAP_QUANTUM	8
+
+/* Special size classes. */
+#define SB_SCLASS_SPAN_OF_SPANS			0
+#define SB_SCLASS_SPAN_LARGE			1
+#define SB_NUM_SIZE_CLASSES				lengthof(sb_size_classes)
+
+/* Helper functions. */
+static char *sb_alloc_guts(char *base, sb_region *region,
+			  sb_allocator *a, int size_class);
+static bool sb_ensure_active_superblock(char *base, sb_region *region,
+							sb_allocator *a, sb_heap *heap,
+							int size_class);
+static void sb_init_span(char *base, sb_span *span, sb_heap *heap,
+			 char *ptr, Size npages, uint16 size_class);
+static void sb_out_of_memory_error(sb_allocator *a);
+static bool sb_transfer_first_span(char *base, sb_heap *heap,
+					   int fromclass, int toclass);
+static void sb_unlink_span(char *base, sb_heap *heap, sb_span *span);
+
+/*
+ * Create a backend-private allocator.
+ */
+sb_allocator *
+sb_create_private_allocator(void)
+{
+	Size	allocator_size;
+	int		heapno;
+	int		fclass;
+	sb_allocator *a;
+	char   *base = NULL;
+
+	allocator_size = offsetof(sb_allocator, heaps);
+	allocator_size += sizeof(sb_heap) * SB_NUM_SIZE_CLASSES;
+	a = malloc(allocator_size);
+	if (a == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory")));
+
+	a->private = true;
+	for (heapno = 0; heapno < SB_NUM_SIZE_CLASSES; ++heapno)
+	{
+		sb_heap *heap = &a->heaps[heapno];
+
+		relptr_store(base, heap->lock, (LWLock *) NULL);
+		for (fclass = 0; fclass < SB_FULLNESS_CLASSES; ++fclass)
+			relptr_store(base, heap->spans[fclass], (sb_span *) NULL);
+	}
+
+	return a;
+}
+
+/*
+ * Allocate memory.
+ */
+void *
+sb_alloc(sb_allocator *a, Size size, int flags)
+{
+	sb_region *region = NULL;
+	char *base = NULL;
+	uint16	size_class;
+	char   *result;
+
+	Assert(size > 0);
+
+	/*
+	 * For shared memory allocation, pointers are relative to the start of the
+	 * region, so finding out that information is essential.  For
+	 * backend-private memory allocation, allocators aren't uniquely tied to
+	 * a region; we'll only need to grab a region if we can't allocate out of
+	 * an existing superblock.
+	 */
+	if (!a->private)
+	{
+		region = sb_lookup_region(a);
+		if (region == NULL)
+			elog(ERROR, "sb_region not found");
+		base = region->region_start;
+	}
+
+	/* If it's too big for a superblock, just grab a raw run of pages. */
+	if (size > sb_size_classes[lengthof(sb_size_classes) - 1])
+	{
+		Size	npages = fpm_size_to_pages(size);
+		Size	first_page;
+		sb_span *span;
+		sb_heap *heap = &a->heaps[SB_SCLASS_SPAN_LARGE];
+		LWLock *lock = relptr_access(base, heap->lock);
+		void *ptr;
+
+		/* Obtain a span object. */
+		span = (sb_span *) sb_alloc_guts(base, region, a,
+										 SB_SCLASS_SPAN_OF_SPANS);
+		if (span == NULL)
+		{
+			if ((flags & SB_ALLOC_SOFT_FAIL) == 0)
+				sb_out_of_memory_error(a);
+			return NULL;
+		}
+
+		/* Find a region from which to allocate. */
+		if (region == NULL)
+			region = sb_private_region_for_allocator(npages);
+
+		/* Here's where we try to perform the actual allocation. */
+		if (region == NULL ||
+			!FreePageManagerGet(region->fpm, npages, &first_page))
+		{
+			/* XXX. Free the span. */
+			if ((flags & SB_ALLOC_SOFT_FAIL) == 0)
+				sb_out_of_memory_error(a);
+			return NULL;
+		}
+		ptr = fpm_page_to_pointer(fpm_segment_base(region->fpm), first_page);
+
+		/* Initialize span and pagemap. */
+		if (lock != NULL)
+			LWLockAcquire(lock, LW_EXCLUSIVE);
+		sb_init_span(base, span, heap, ptr, npages, SB_SCLASS_SPAN_LARGE);
+		if (lock != NULL)
+			LWLockRelease(lock);
+		sb_map_set(region->pagemap, first_page, span);
+
+		return ptr;
+	}
+
+	/* Map allocation to a size class. */
+	if (size < lengthof(sb_size_class_map) * SB_SIZE_CLASS_MAP_QUANTUM)
+	{
+		int	mapidx;
+
+		mapidx = ((size + SB_SIZE_CLASS_MAP_QUANTUM - 1) /
+					SB_SIZE_CLASS_MAP_QUANTUM) - 1;
+		size_class = sb_size_class_map[mapidx];
+	}
+	else
+	{
+		uint16	min = sb_size_class_map[lengthof(sb_size_class_map) - 1];
+		uint16	max = lengthof(sb_size_classes) - 1;
+
+		while (min < max)
+		{
+			uint16	mid = (min + max) / 2;
+			uint16	class_size = sb_size_classes[mid];
+
+			if (class_size < size)
+				min = mid + 1;
+			else
+				max = mid;
+		}
+
+		size_class = min;
+	}
+	Assert(size <= sb_size_classes[size_class]);
+	Assert(size_class == 0 || size > sb_size_classes[size_class - 1]);
+
+	/* Attempt the actual allocation. */
+	result = sb_alloc_guts(base, region, a, size_class);
+	if (result == NULL && (flags & SB_ALLOC_SOFT_FAIL) == 0)
+		sb_out_of_memory_error(a);
+	return result;		
+}
+
+/*
+ * Free memory allocated via sb_alloc.
+ */
+void
+sb_free(void *ptr)
+{
+	sb_region *region;
+	char   *fpm_base;
+	char   *base = NULL;
+	sb_span *span;
+	LWLock *lock = NULL;
+	char   *superblock;
+	Size	pageno;
+	Size	obsize;
+	uint16	size_class;
+
+	/* Locate the containing superblock. */
+	region = sb_lookup_region(ptr);
+	fpm_base = fpm_segment_base(region->fpm);
+	pageno = fpm_pointer_to_page(fpm_base, ptr);
+	span = sb_map_get(region->pagemap, pageno);
+
+	/*
+	 * If this is a shared-memory region, we might need locking.  If so,
+	 * lock the heap.
+	 */
+	if (region->seg != NULL)
+	{
+		sb_heap *heap = relptr_access(fpm_base, span->parent);
+		base = fpm_base;
+		lock = relptr_access(fpm_base, heap->lock);
+		if (lock != NULL)
+			LWLockAcquire(lock, LW_EXCLUSIVE);
+	}
+
+	/* Compute the object size. */
+	size_class = span->size_class;
+	obsize = sb_size_classes[size_class];
+
+	/* If it's a large object, free the entire span. */
+	if (size_class == SB_SCLASS_SPAN_LARGE)
+	{
+		sb_heap *heap = relptr_access(base, span->parent);
+		Size	first_page;
+
+		sb_unlink_span(base, heap, span);
+		first_page = fpm_pointer_to_page(fpm_base,
+										 relptr_access(base, span->start));
+		FreePageManagerPut(region->fpm, first_page, span->npages);
+		sb_free(span);
+
+		/* We're done, but must release any lock first. */
+		if (lock != NULL)
+			LWLockRelease(lock);
+	}
+
+	/* Put the object on the superblock's freelist. */
+	superblock = relptr_access(base, span->start);
+	Assert(((char *) ptr) >= superblock);
+	Assert(((char *) ptr) < superblock + SB_SUPERBLOCK_SIZE);
+	Assert((((char *) ptr) - superblock) % obsize == 0);
+	* (Size *) ptr = span->firstfree;
+	span->firstfree = (((char *) ptr) - superblock) / obsize;
+	span->nallocatable++;
+
+	if (span->nallocatable == 1 && span->fclass == SB_FULLNESS_CLASSES - 1)
+	{
+		sb_heap *heap = relptr_access(base, span->parent);
+		sb_span *new_nextspan;
+
+		/*
+		 * The superblock is completely full and is located in the
+		 * highest-numbered fullness class, which is never scanned for free
+		 * chunks.  We must move it to the next-lower fullness class.
+		 */
+
+		sb_unlink_span(base, heap, span);
+		span->fclass = SB_FULLNESS_CLASSES - 2;
+		relptr_copy(span->nextspan, heap->spans[SB_FULLNESS_CLASSES - 2]);
+		relptr_store(base, span->prevspan, (sb_span *) NULL);
+		new_nextspan = relptr_access(base,
+									 heap->spans[SB_FULLNESS_CLASSES - 2]);
+		if (new_nextspan != NULL)
+			relptr_store(base, new_nextspan->prevspan, span);
+		relptr_store(base, heap->spans[SB_FULLNESS_CLASSES - 2], span);
+	}
+	else if (span->nallocatable == span->nmax && (span->fclass != 1 ||
+		!relptr_is_null(span->prevspan)))
+	{
+		sb_heap *heap = relptr_access(base, span->parent);
+		Size	first_page;
+
+		/*
+		 * This entire superblock is free, and it's not the active superblock
+		 * for this size class.  Return the memory to the free page manager.
+		 * We don't do this for the active superblock to prevent hysteresis:
+		 * if we repeatedly allocate and free the only chunk in the active
+		 * superblock, it will be very inefficient if we deallocate and
+		 * reallocate the superblock every time.
+		 */
+		sb_unlink_span(base, heap, span);
+		first_page = fpm_pointer_to_page(fpm_base,
+										 relptr_access(base, span->start));
+		FreePageManagerPut(region->fpm, first_page, span->npages);
+
+		/*
+		 * Span-of-spans superblocks store the span which describes them
+		 * within the superblock itself, so freeing the storage implicitly
+		 * frees the descriptor also.  If this is a superblock of any other
+		 * type, we need to separately free the span object also.
+		 */
+		if (size_class != SB_SCLASS_SPAN_OF_SPANS)
+			sb_free(span);
+	}
+
+	/* If we locked the heap, release the lock. */
+	if (lock != NULL)
+		LWLockRelease(lock);
+}
+
+/*
+ * Return the size of the chunk that will be used to satisfy a given
+ * allocation.
+ */
+Size
+sb_alloc_space(Size size)
+{
+	uint16	size_class;
+
+	/* Large objects allocate full pages. */
+	if (size > sb_size_classes[lengthof(sb_size_classes) - 1])
+		return FPM_PAGE_SIZE * fpm_size_to_pages(size);
+
+	/* Map request size to a size class. */
+	if (size < lengthof(sb_size_class_map) * SB_SIZE_CLASS_MAP_QUANTUM)
+	{
+		int	mapidx;
+
+		mapidx = ((size + SB_SIZE_CLASS_MAP_QUANTUM - 1) /
+					SB_SIZE_CLASS_MAP_QUANTUM) - 1;
+		size_class = sb_size_class_map[mapidx];
+	}
+	else
+	{
+		uint16	min = sb_size_class_map[lengthof(sb_size_class_map) - 1];
+		uint16	max = lengthof(sb_size_classes) - 1;
+		while (min < max)
+		{
+			uint16	mid = (min + max) / 2;
+			uint16	class_size = sb_size_classes[mid];
+
+			if (class_size < size)
+				min = mid + 1;
+			else
+				max = mid;
+		}
+		size_class = min;
+	}
+
+	return sb_size_classes[size_class];
+}
+
+/*
+ * Return the size of the chunk used to satisfy a given allocation.
+ *
+ * This is roughly an analogue of GetMemoryChunkSpace, but it's hard to make
+ * a precisely fair comparison.  Unlike MemoryContextAlloc/AllocSetAlloc,
+ * there's no bookkeeping overhead associated with any single allocation;
+ * the only thing we can really reflect here is the fact that allocations
+ * will be rounded up to the next larger size class (or, for large allocations,
+ * to a full FPM page).  The storage overhead of the sb_span, sb_map,
+ * sb_region, and FreePageManager structures is typically spread across
+ * enough small allocations to make reflecting those costs here difficult.
+ *
+ * On the other hand, we also hope that the overhead in question is small
+ * enough not to matter.  The system malloc is not without bookkeeping
+ * overhead of its own.
+ */
+Size
+sb_chunk_space(void *ptr)
+{
+	sb_region *region;
+	char   *fpm_base;
+	sb_span *span;
+	Size	pageno;
+	uint16	size_class;
+
+	/* Locate the containing superblock. */
+	region = sb_lookup_region(ptr);
+	fpm_base = fpm_segment_base(region->fpm);
+	pageno = fpm_pointer_to_page(fpm_base, ptr);
+	span = sb_map_get(region->pagemap, pageno);
+
+	/* Work out the size of the allocation. */	
+	size_class = span->size_class;
+	if (span->size_class == SB_SCLASS_SPAN_LARGE)
+		return FPM_PAGE_SIZE * span->npages;
+	else
+		return sb_size_classes[size_class];
+}
+
+/*
+ * Free all memory used by an allocator.
+ *
+ * NB: It's not safe to do this while the allocator is in use!
+ */
+void
+sb_reset_allocator(sb_allocator *a)
+{
+	char *base = NULL;
+	int heapno;
+
+	/*
+	 * For shared memory allocation, pointers are relative to the start of the
+	 * region.
+	 */
+	if (!a->private)
+	{
+		sb_region *region = sb_lookup_region(a);
+		if (region == NULL)
+			elog(ERROR, "sb_region not found");
+		base = region->region_start;
+	}
+
+	/*
+	 * Iterate through heaps back to front.  We do it this way so that
+	 * spans-of-spans are freed last.
+	 */
+	for (heapno = SB_NUM_SIZE_CLASSES - 1; heapno >= 0; --heapno)
+	{
+		sb_heap *heap = &a->heaps[heapno];
+		int		fclass;
+
+		for (fclass = 0; fclass < SB_FULLNESS_CLASSES; ++fclass)
+		{
+			sb_region *region;
+			char *superblock;
+			sb_span *span;
+
+			span = relptr_access(base, heap->spans[fclass]);
+			while (span != NULL)
+			{
+				Size	offset;
+				sb_span *nextspan;
+
+				superblock = relptr_access(base, span->start);
+				nextspan = relptr_access(base, span->nextspan);
+				region = sb_lookup_region(superblock);
+				Assert(region != NULL);
+				offset = superblock - fpm_segment_base(region->fpm);
+				Assert(offset % FPM_PAGE_SIZE == 0);
+				FreePageManagerPut(region->fpm, offset / FPM_PAGE_SIZE,
+								   span->npages);
+				span = nextspan;
+			}
+		}
+	}
+}
+
+/*
+ * Allocate an object of the requested size class from the given allocator.
+ * If necessary, steal or create another superblock.
+ */
+static char *
+sb_alloc_guts(char *base, sb_region *region, sb_allocator *a, int size_class)
+{
+	sb_heap *heap = &a->heaps[size_class];
+	LWLock *lock = relptr_access(base, heap->lock);
+	sb_span *active_sb;
+	char   *superblock;
+	char   *result;
+	Size	obsize;
+
+	/* If locking is in use, acquire the lock. */
+	if (lock != NULL)
+		LWLockAcquire(lock, LW_EXCLUSIVE);
+
+	/*
+	 * If there's no active superblock, we must successfully obtain one or
+	 * fail the request.
+	 */
+	if (relptr_is_null(heap->spans[1])
+		&& !sb_ensure_active_superblock(base, region, a, heap, size_class))
+	{
+		if (lock != NULL)
+			LWLockRelease(lock);
+		return NULL;
+	}
+	Assert(!relptr_is_null(heap->spans[1]));
+
+	/*
+	 * There should be a superblock in fullness class 1 at this point, and
+	 * it should never be completely full.  Thus we can either pop the
+	 * free list or, failing that, initialize a new object.
+	 */
+	active_sb = relptr_access(base, heap->spans[1]);
+	Assert(active_sb != NULL && active_sb->nallocatable > 0);
+	superblock = relptr_access(base, active_sb->start);
+	Assert(size_class < SB_NUM_SIZE_CLASSES);
+	obsize = sb_size_classes[size_class];
+	if (active_sb->firstfree != SB_SPAN_NOTHING_FREE)
+	{
+		result = superblock + active_sb->firstfree * obsize;
+		active_sb->firstfree = * (Size *) result;
+	}
+	else
+	{
+		result = superblock + active_sb->ninitialized * obsize;
+		++active_sb->ninitialized;
+	}
+	--active_sb->nallocatable;
+
+	/* If it's now full, move it to the highest-numbered fullness class. */
+	if (active_sb->nallocatable == 0)
+		sb_transfer_first_span(base, heap, 1, SB_FULLNESS_CLASSES - 1);
+
+	/* We're all done.  Release the lock. */
+	if (lock != NULL)
+		LWLockRelease(lock);
+
+	return result;
+}
+
+/*
+ * Ensure an active (i.e. fullness class 1) superblock, unless all existing
+ * superblocks are completely full and no more can be allocated.
+ *
+ * Fullness classes K of 0..N is loosely intended to represent superblocks
+ * whose utilization percentage is at least K/N, but we only enforce this
+ * rigorously for the highest-numbered fullness class, which always contains
+ * exactly those blocks that are completely full.  It's otherwise acceptable
+ * for a superblock to be in a higher-numbered fullness class than the one
+ * to which it logically belongs.  In addition, the active superblock, which
+ * is always the first block in fullness class 1, is permitted to have a
+ * higher allocation percentage than would normally be allowable for that
+ * fullness class; we don't move it until it's completely full, and then
+ * it goes to the highest-numbered fullness class.
+ *
+ * It might seem odd that the active superblock is the head of fullness class
+ * 1 rather than fullness class 0, but experience with other allocators has
+ * shown that it's usually better to allocate from a superblock that's
+ * moderately full rather than one that's nearly empty.  Insofar as is
+ * reasonably possible, we want to avoid performing new allocations in a
+ * superblock that would otherwise become empty soon.
+ */
+static bool
+sb_ensure_active_superblock(char *base, sb_region *region, sb_allocator *a,
+							sb_heap *heap, int size_class)
+{
+	Size	obsize = sb_size_classes[size_class];
+	Size	nmax;
+	int		fclass;
+	sb_span *span = NULL;
+	Size	npages = 1;
+	Size	first_page;
+	Size	i;
+	void   *ptr;
+
+	/*
+	 * Compute the number of objects that will fit in a superblock of this
+	 * size class.  Span-of-spans superblocks are just a single page, and the
+	 * first object isn't available for use because it describes the
+	 * span-of-spans itself.
+	 */
+	if (size_class == SB_SCLASS_SPAN_OF_SPANS)
+		nmax = FPM_PAGE_SIZE / obsize - 1;
+	else
+ 		nmax = SB_SUPERBLOCK_SIZE / obsize;
+
+	/*
+	 * If fullness class 1 is empty, try to find something to put in it by
+	 * scanning higher-numbered fullness classes (excluding the last one,
+	 * whose blocks are certain to all be completely full).
+	 */
+	for (fclass = 2; fclass < SB_FULLNESS_CLASSES - 1; ++fclass)
+	{
+		sb_span *span;
+
+		span = relptr_access(base, heap->spans[fclass]);
+		while (span != NULL)
+		{
+			int		tfclass;
+			sb_span *nextspan;
+			sb_span *prevspan;
+
+			/* Figure out what fullness class should contain this. */
+			tfclass = (nmax - span->nallocatable)
+				* (SB_FULLNESS_CLASSES - 1) / nmax;
+
+			/* Look up next span. */
+			nextspan = relptr_access(base, span->nextspan);
+
+			/*
+			 * If utilization has dropped enough that this now belongs in
+			 * some other fullness class, move it there.
+			 */
+			if (tfclass < fclass)
+			{
+				prevspan = relptr_access(base, span->prevspan);
+
+				relptr_copy(span->nextspan, heap->spans[tfclass]);
+				relptr_store(base, span->prevspan, (sb_span *) NULL);
+				if (nextspan != NULL)
+					relptr_copy(nextspan->prevspan, span->prevspan);
+				if (prevspan != NULL)
+					relptr_copy(prevspan->nextspan, span->nextspan);
+				else
+					relptr_copy(heap->spans[fclass], span->nextspan);
+				span->fclass = tfclass;
+			}
+
+			/* Advance to next span on list. */
+			span = nextspan;
+		}
+
+		/* Stop now if we found a suitable superblock. */
+		if (!relptr_is_null(heap->spans[1]))
+			return true;
+	}
+
+	/*
+	 * If there are no superblocks that properly belong in fullness class 1,
+	 * pick one from some other fullness class and move it there anyway, so
+	 * that we have an allocation target.  Our last choice is to transfer a
+	 * superblock that's almost empty (and might become completely empty soon
+	 * if left alone), but even that is better than failing, which is what we
+	 * must do if there are no superblocks at all with freespace.
+	 */
+	Assert(relptr_is_null(heap->spans[1]));
+	for (fclass = 2; fclass < SB_FULLNESS_CLASSES - 1; ++fclass)
+		if (sb_transfer_first_span(base, heap, fclass, 1))
+			return true;
+	if (relptr_is_null(heap->spans[1]) &&
+		sb_transfer_first_span(base, heap, 0, 1))
+			return true;
+
+	/*
+	 * Get an sb_span object to describe the new superblock... unless
+	 * this allocation is for an sb_span object, in which case that's
+	 * surely not going to work.  We handle that case by storing the
+	 * sb_span describing an sb_span superblock inline.
+	 */
+	if (size_class != SB_SCLASS_SPAN_OF_SPANS)
+	{
+		sb_region *span_region = a->private ? NULL : region;
+
+		span = (sb_span *) sb_alloc_guts(base, span_region, a,
+										 SB_SCLASS_SPAN_OF_SPANS);
+		if (span == NULL)
+			return false;
+		npages = SB_PAGES_PER_SUPERBLOCK;
+	}
+
+	/* Find a region from which to allocate the superblock. */
+	if (region == NULL)
+	{
+		Assert(a->private);
+		region = sb_private_region_for_allocator(npages);
+	}
+
+	/* Try to allocate the actual superblock. */
+	if (region == NULL ||
+		!FreePageManagerGet(region->fpm, npages, &first_page))
+	{
+		/* XXX. Free the span, if any. */
+		return false;
+	}
+	ptr = fpm_page_to_pointer(fpm_segment_base(region->fpm), first_page);
+
+	/*
+	 * If this is a span-of-spans, carve the descriptor right out of
+	 * the allocated space.
+	 */
+	if (size_class == SB_SCLASS_SPAN_OF_SPANS)
+		span = (sb_span *) ptr;
+
+	/* Initialize span and pagemap. */
+	sb_init_span(base, span, heap, ptr, npages, size_class);
+	for (i = 0; i < npages; ++i)
+		sb_map_set(region->pagemap, first_page + i, span);
+
+	return true;
+}
+
+/*
+ * Add a new span to fullness class 1 of the indicated heap.
+ */
+static void
+sb_init_span(char *base, sb_span *span, sb_heap *heap, char *ptr,
+			 Size npages, uint16 size_class)
+{
+	sb_span *head = relptr_access(base, heap->spans[1]);
+	Size	obsize = sb_size_classes[size_class];
+
+	if (head != NULL)
+		relptr_store(base, head->prevspan, span);
+	relptr_store(base, span->parent, heap);
+	relptr_store(base, span->nextspan, head);
+	relptr_store(base, span->prevspan, (sb_span *) NULL);
+	relptr_store(base, heap->spans[1], span);
+	relptr_store(base, span->start, ptr);
+	span->npages = npages;
+	span->size_class = size_class;
+	span->ninitialized = 0;
+	if (size_class == SB_SCLASS_SPAN_OF_SPANS)
+	{
+		/*
+		 * A span-of-spans contains its own descriptor, so mark one object
+		 * as initialized and reduce the count of allocatable objects by one.
+		 * Doing this here has the side effect of also reducing nmax by one,
+		 * which is important to make sure we free this object at the correct
+		 * time.
+		 */
+		span->ninitialized = 1;
+ 		span->nallocatable = FPM_PAGE_SIZE / obsize - 1;
+	}
+	else if (size_class != SB_SCLASS_SPAN_LARGE)
+ 		span->nallocatable = SB_SUPERBLOCK_SIZE / obsize;
+	span->firstfree = SB_SPAN_NOTHING_FREE;
+	span->nmax = span->nallocatable;
+	span->fclass = 1;
+}
+
+/*
+ * Report an out-of-memory condition.
+ */
+static void
+sb_out_of_memory_error(sb_allocator *a)
+{
+	if (a->private)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory")));
+	else
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of shared memory")));
+}
+
+/*
+ * Transfer the first span in one fullness class to the head of another
+ * fullness class.
+ */
+static bool
+sb_transfer_first_span(char *base, sb_heap *heap, int fromclass, int toclass)
+{
+	sb_span *span;
+	sb_span *nextspan;
+
+	/* Can't do it if source list is empty. */
+	span = relptr_access(base, heap->spans[fromclass]);
+	if (span == NULL)
+		return false;
+
+	/* Remove span from source list. */
+	nextspan = relptr_access(base, span->nextspan);
+	relptr_store(base, heap->spans[fromclass], nextspan);
+	if (nextspan != NULL)
+		relptr_store(base, nextspan->prevspan, (sb_span *) NULL);
+
+	/* Add span to target list. */
+	relptr_copy(span->nextspan, heap->spans[toclass]);
+	relptr_store(base, heap->spans[toclass], span);
+	nextspan = relptr_access(base, span->nextspan);
+	if (nextspan != NULL)
+		relptr_store(base, nextspan->prevspan, span);
+	span->fclass = toclass;
+
+	return true;
+}
+
+/*
+ * Remove span from current list.
+ */
+static void
+sb_unlink_span(char *base, sb_heap *heap, sb_span *span)
+{
+	sb_span *nextspan = relptr_access(base, span->nextspan);
+	sb_span *prevspan = relptr_access(base, span->prevspan);
+
+	relptr_store(base, span->prevspan, (sb_span *) NULL);
+	if (nextspan != NULL)
+		relptr_copy(nextspan->prevspan, span->prevspan);
+	if (prevspan != NULL)
+		relptr_copy(prevspan->nextspan, span->nextspan);
+	else
+		relptr_copy(heap->spans[span->fclass], span->nextspan);
+}
diff --git a/src/backend/utils/mmgr/sb_map.c b/src/backend/utils/mmgr/sb_map.c
new file mode 100644
index 0000000..7c629df
--- /dev/null
+++ b/src/backend/utils/mmgr/sb_map.c
@@ -0,0 +1,137 @@
+/*-------------------------------------------------------------------------
+ *
+ * sb_map.c
+ *	  Superblock allocator page-mapping infrastructure.
+ *
+ * The superblock allocator does not store metadata with each chunk, and
+ * therefore needs a way to find the metadata given only the pointer
+ * address.  The first step is to translate the pointer address to a
+ * an offset relative to some base address, from which a page number
+ * can be calculated.  Then, this module is reponsible for mapping the
+ * page number to an offset with the chunk where the associated span
+ * object is stored.  We do this in the simplest possible way: one big
+ * array.
+ *
+ * Span metadata is stored within the same chunk of memory as the span
+ * itself.  Therefore, we can assume that the offset is less than 4GB
+ * whenever we're managing less than 4GB of pages, and use 4 byte
+ * offsets.  When we're managing more than 4GB of pages, we use 8 byte
+ * offsets.  (This could probably be optimized; for example, we could use
+ * 6 byte offsets for allocation sizes up to 256TB; also, if we assumed
+ * that the span object must itself be 2, 4, or 8 byte aligned, we could
+ * extend the cutoff point for offsets of any given length by a similar
+ * multiple.  It's not clear that the extra math would be worthwhile.)
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/utils/mmgr/sb_map.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/shmem.h"
+#include "utils/freepage.h"
+#include "utils/sb_map.h"
+
+const uint64 maxpages_4b = UINT64CONST(0x100000000) / FPM_PAGE_SIZE;
+
+struct sb_map
+{
+	relptr(sb_map) self;
+	Size	offset;
+	Size	npages;
+	bool	use64;
+};
+
+/* Map layout for segments less than 4GB. */
+typedef struct sb_map32
+{
+	sb_map	hdr;
+	uint32	map[FLEXIBLE_ARRAY_MEMBER];
+} sb_map32;
+
+/* Map layout for segments less than 8GB. */
+typedef struct sb_map64
+{
+	sb_map	hdr;
+	uint64	map[FLEXIBLE_ARRAY_MEMBER];
+} sb_map64;
+
+#define sb_map_base(m) \
+	(((char *) m) - m->self.relptr_off)
+
+/*
+ * Compute the amount of space required for an sb_map covering a given
+ * number of pages.  Note that for shared memory (i.e. when base != NULL),
+ * we assume that the pointers will always point to addresses within that
+ * same segment, but for backend-private memory that might not be the case.
+ */
+Size
+sb_map_size(char *base, Size npages)
+{
+	Size	map_bytes;
+
+	if (sizeof(Size) <= 4 || (base != NULL && npages < maxpages_4b))
+		map_bytes = add_size(offsetof(sb_map32, map),
+							 mul_size(npages, sizeof(uint32)));
+	else
+		map_bytes = add_size(offsetof(sb_map64, map),
+							 mul_size(npages, sizeof(uint64)));
+
+	return map_bytes;
+}
+
+/*
+ * Initialize an sb_map.  Storage is provided by the caller.  Note that we
+ * don't zero the array; the caller shouldn't try to get a value that hasn't
+ * been set.
+ */
+void
+sb_map_initialize(sb_map *m, char *base, Size offset, Size npages)
+{
+	relptr_store(base, m->self, m);
+	m->offset = offset;
+	m->npages = npages;
+	if (sizeof(Size) <= 4 || (base != NULL && npages < maxpages_4b))
+		m->use64 = false;
+	else
+		m->use64 = true;
+}
+
+/*
+ * Store a value into an sb_map.
+ */
+void
+sb_map_set(sb_map *m, Size pageno, void *ptr)
+{
+	char   *base = sb_map_base(m);
+	Assert(pageno >= m->offset);
+	pageno -= m->offset;
+	Assert(pageno < m->npages);
+
+	if (m->use64)
+		((sb_map64 *) m)->map[pageno] = (uint64) (((char *) ptr) - base);
+	else
+		((sb_map32 *) m)->map[pageno] = (uint32) (((char *) ptr) - base);
+}
+
+/*
+ * Get a value from an sb_map.  Getting a value not previously stored will
+ * produce an undefined result, so don't do that.
+ */
+void *
+sb_map_get(sb_map *m, Size pageno)
+{
+	char   *base = sb_map_base(m);
+	Assert(pageno >= m->offset);
+	pageno -= m->offset;
+	Assert(pageno < m->npages);
+
+	if (m->use64)
+		return base + ((sb_map64 *) m)->map[pageno];
+	else
+		return base + ((sb_map32 *) m)->map[pageno];
+}
diff --git a/src/backend/utils/mmgr/sb_region.c b/src/backend/utils/mmgr/sb_region.c
new file mode 100644
index 0000000..1c51563
--- /dev/null
+++ b/src/backend/utils/mmgr/sb_region.c
@@ -0,0 +1,744 @@
+/*-------------------------------------------------------------------------
+ *
+ * sb_region.c
+ *	  Superblock allocator memory region manager.
+ *
+ * The superblock allocator operates on ranges of pages managed by a
+ * FreePageManager and reverse-mapped by an sb_map.  When it's asked to
+ * free an object, it just gets a pointer address; our job is to figure
+ * out which page range contains that object and locate the
+ * FreePageManager, sb_map, and other metadata that the superblock
+ * allocator will need to do its thing.  Moreover, when allocating an
+ * object, the caller is only required to provide the superblock allocator
+ * with a pointer to the sb_allocator object, which could be in either
+ * shared or backend-private memory; our job again is to know which it
+ * is and provide pointers to the appropriate supporting data structures.
+ * To do all this, we have to keep track of where all dynamic shared memory
+ * segments configured for memory allocation are located; and we also have
+ * to keep track of where all chunks of memory obtained from the operating
+ * system for backend-private allocations are located.
+ *
+ * On a 32-bit system, the number of chunks can never get very big, so
+ * we just store them all in a single array and use binary search for
+ * lookups.  On a 64-bit system, this might get dicey, so we maintain
+ * one such array for every 4GB of address space; chunks that span a 4GB
+ * boundary require multiple entries.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/utils/mmgr/sb_region.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "utils/sb_region.h"
+
+/*
+ * On 64-bit systems, we use a two-level radix tree to find the data for
+ * the relevant 4GB range.  The radix tree is deliberately unbalanced, with
+ * more entries at the first level than at the second level.  We expect this
+ * to save memory, because the first level has a cache, and the full array
+ * is only instantiated if the cache overflows.  Since each L2 entry
+ * covers 2^44 bytes of address space (16TB), we expect overflows of the
+ * four-entry cache to happen essentially never.
+ */
+#define SB_LOOKUP_ROOT_BITS			20
+#define SB_LOOKUP_ROOT_ENTRIES		(1 << SB_LOOKUP_ROOT_BITS)
+#define SB_LOOKUP_ROOT_CACHE_SIZE	4
+#define SB_LOOKUP_L2_BITS			12
+#define SB_LOOKUP_L2_ENTRIES		(1 << SB_LOOKUP_L2_BITS)
+
+/* Lookup data for a 4GB range of address space. */
+typedef struct
+{
+	int		nused;
+	int		nallocated;
+	sb_region **region;
+} sb_lookup_leaf;
+
+/* Lookup data for a 16TB range of address space, direct mapped. */
+typedef struct
+{
+	sb_lookup_leaf *leaf[SB_LOOKUP_L2_ENTRIES];
+} sb_lookup_l2;
+
+/* Lookup data for an entire 64-bit address space. */
+typedef struct
+{
+	uint32	cache_key[SB_LOOKUP_ROOT_CACHE_SIZE];
+	sb_lookup_l2 *cache_value[SB_LOOKUP_ROOT_CACHE_SIZE];
+	sb_lookup_l2 **l2;
+} sb_lookup_root;
+
+/* Toplevel address lookup structure. */
+#if SIZEOF_SIZE_T > 4
+static sb_lookup_root lookup_root;
+#else
+static sb_lookup_leaf lookup_root_leaf;
+#endif
+
+/*
+ * Backend-private chunks binned by maximum contiguous freespace.  Lists are
+ * doubly-linked using fl_node.  List 0 contains regions with no internal
+ * no free pages at all.  List I, for I>0, contains regions where the number
+ * of contiguous free pages is no larger than 2^(I-1), except for the last
+ * list which contains everything with too many pages for any other list.
+ * A region may be on a higher-numbered list than where it actually belongs,
+ * but it cannot be any lower.  Thus it's safe to assume that searching
+ * lower-numbered lists is always pointless, but higher-numbered lists may
+ * contain regions that can't actually satisfy a requested allocation.
+ */
+#define NUM_PRIVATE_FREELISTS	16
+static dlist_head private_freelist[NUM_PRIVATE_FREELISTS];
+
+/*
+ * Constants to set the size of backend-private regions.  Superblocks are
+ * 16 pages each (64kB), and we want a number of superblocks to fit inside
+ * each region, so these need to be pretty good-sized.  The actual
+ * allocations will be a bit larger than the values indicated here, because
+ * we add a bit of space for bookkeeping.  These values are in units of
+ * FPM_PAGE_SIZE.
+ */
+#define SB_REGION_INITSIZE		(16 * SB_PAGES_PER_SUPERBLOCK)
+#define SB_REGION_MAXSIZE		((64 * 1024 * 1024) / FPM_PAGE_SIZE)
+
+static Size sb_private_pages_allocated = 0;
+static Size sb_private_bytes_allocated = 0;
+static Size sb_peak_private_bytes_allocated = 0;
+
+/* Static functions. */
+static bool sb_adjust_lookup(sb_region *region, bool insert);
+static bool sb_adjust_lookup_leaf(sb_lookup_leaf *leaf, sb_region *region,
+					  bool insert);
+static void sb_dump_regions_leaf(sb_region *last_region, sb_lookup_leaf *leaf);
+#if SIZEOF_SIZE_T > 4
+static sb_lookup_leaf *sb_find_leaf(Size highbits, bool insert);
+#endif
+static void *system_calloc(Size count, Size s);
+static void system_free(void *p, Size s);
+static void *system_malloc(Size s);
+
+/*
+ * Dump debugging information for sb_region objects.
+ */
+void
+sb_dump_regions(void)
+{
+#if SIZEOF_SIZE_T > 4
+	sb_region *last_region = NULL;
+
+	if (lookup_root.l2 != NULL)
+	{
+		int i;
+		int j;
+
+		for (i = 0; i < SB_LOOKUP_ROOT_ENTRIES; ++i)
+		{
+			sb_lookup_l2 *l2 = lookup_root.l2[i];
+
+			if (l2 == NULL)
+				continue;
+			for (j = 0; j < SB_LOOKUP_L2_ENTRIES; ++j)
+			{
+				sb_lookup_leaf *leaf = l2->leaf[j];
+
+				if (leaf != NULL)
+				{
+					sb_dump_regions_leaf(last_region, leaf);
+					last_region = leaf->region[leaf->nused - 1];
+				}
+			}
+		}
+	}
+	else
+	{
+		bool	first = true;
+		Size	highbits = 0;
+
+		for (;;)
+		{
+			int		i;
+			int		j;
+			int		n = -1;
+			sb_lookup_l2 *l2;
+
+			/* Find next L2 entry to visit. */
+			for (i = 0; i < SB_LOOKUP_ROOT_CACHE_SIZE; ++i)
+			{
+				if (lookup_root.cache_value[i] != NULL &&
+					(first || lookup_root.cache_key[i] > highbits))
+					n = i;
+			}
+			if (n == -1)
+				break;
+			first = false;
+			highbits = lookup_root.cache_key[n];
+
+			/* Dump this L2 entry. */
+			l2 = lookup_root.cache_value[n];
+			for (j = 0; j < SB_LOOKUP_L2_ENTRIES; ++j)
+			{
+				sb_lookup_leaf *leaf = l2->leaf[j];
+
+				if (leaf != NULL)
+				{
+					sb_dump_regions_leaf(last_region, leaf);
+					last_region = leaf->region[leaf->nused - 1];
+				}
+			}
+		}
+	}
+#else
+	sb_dump_regions_leaf(NULL, lookup_root_leaf);
+#endif
+
+	fprintf(stderr, "== overall statistics ==\n");
+	fprintf(stderr, "private bytes now: %zu, peak %zu\n",
+		sb_private_bytes_allocated,
+		Max(sb_private_bytes_allocated, sb_peak_private_bytes_allocated));
+}
+
+/*
+ * Find the region to which a pointer belongs.
+ */
+sb_region *
+sb_lookup_region(void *ptr)
+{
+	Size p = (Size) ptr;
+	sb_lookup_leaf *leaf = NULL;
+	int		high, low;
+
+	/*
+	 * If this is a 64-bit system, locate the lookup table that pertains
+	 * to the upper 32 bits of ptr.  On a 32-bit system, there's only one
+	 * lookup table.
+	 */
+#if SIZEOF_SIZE_T > 4
+	{
+		Size	highbits = p >> 32;
+		static Size last_highbits = 0;
+		static sb_lookup_leaf *last_leaf = NULL;
+
+		/* Quick test to see if we're in same range as before. */
+		if (last_highbits == highbits && last_leaf != NULL)
+			leaf = last_leaf;
+		else
+		{
+			leaf = sb_find_leaf(highbits, false);
+
+			/* No lookup table for this 4GB range?  OK, no matching region. */
+			if (leaf == NULL)
+				return NULL;
+
+			/* Remember results of this lookup for next time. */
+			last_highbits = highbits;
+			last_leaf = leaf;
+		}
+	}
+#else
+	leaf = &lookup_root_leaf;
+#endif
+
+	/* Now we use binary search on the sb_lookup_leaf. */
+	high = leaf->nused;
+	low = 0;
+	while (low < high)
+	{
+		int mid;
+		sb_region *region;
+
+		mid = (high + low) / 2;
+		region = leaf->region[mid];
+		if (region->region_start > (char *) ptr)
+			high = mid;
+		else if (region->region_start + region->region_size < (char *) ptr)
+			low = mid + 1;
+		else
+			return region;
+	}
+
+	return NULL;
+}
+
+/*
+ * When a backend-private sb_allocator needs more memory, it calls this
+ * function.  We search the existing backend-private regions for one capable
+ * of satisfying the request; if none found, we must create a new region.
+ */
+sb_region *
+sb_private_region_for_allocator(Size npages)
+{
+	int freelist = Min(fls(npages), NUM_PRIVATE_FREELISTS);
+	Size	new_region_net_pages;
+	Size	metadata_bytes;
+	char   *region_start;
+	Size	region_size;
+	sb_region *region;
+
+	Assert(npages > 0);
+
+	while (freelist < NUM_PRIVATE_FREELISTS)
+	{
+		dlist_mutable_iter	iter;
+		Size		threshold = 1 << (freelist - 1);
+
+		dlist_foreach_modify(iter, &private_freelist[freelist])
+		{
+			sb_region  *region;
+			Size	largest;
+
+			region = dlist_container(sb_region, fl_node, iter.cur);
+
+			/*
+			 * Quickly skip regions which appear to have enough space to
+			 * belong on this freelist but which don't have enough space to
+			 * satisfy the request, to avoid probing every region on the list
+			 * for its exact free space on every trip through.
+			 */
+			if (region->contiguous_pages >= threshold &&
+				region->contiguous_pages < npages)
+				continue;
+
+			/*
+			 * We're going to either use this region or move it to a
+			 * lower-numbered freelist or both, so determine the precise size
+			 * of the largest remaining run of pages.
+			 */
+			largest = FreePageManagerInquireLargest(region->fpm);
+			region->contiguous_pages = largest;
+
+			/*
+			 * The region we're examining not only doesn't have enough
+			 * contiguous freespace to satisfy this allocation, but it
+			 * doesn't even belong in this bucket.  Move it to the right place.
+			 */
+			if (largest < threshold)
+			{
+				int	new_freelist = Min(fls(largest), NUM_PRIVATE_FREELISTS);
+
+				dlist_delete(iter.cur);
+				dlist_push_head(&private_freelist[new_freelist],
+								&region->fl_node);
+			}
+
+			/*
+			 * If the region is big enough, use it.  For larger allocations
+			 * this might be suboptimal, because we might carve space out of a
+			 * chunk that's bigger than we really need rather than locating
+			 * the best fit across all chunks.  It shouldn't be too far off,
+			 * though, because chunks with way more contiguous space available
+			 * will be on a higher-numbered freelist.
+			 *
+			 * NB: For really large backend-private allocations, it's probably
+			 * better to malloc() directly than go through this machinery.
+			 */
+			if (largest >= npages)
+				return region;
+		}
+
+		/* Try next freelist. */
+		++freelist;
+	}
+
+	/*
+	 * There is no existing backend-private region with enough freespace
+	 * to satisfy the request, so we'll need to create a new one.  First
+	 * step is to figure out how many pages we should try to obtain.
+	 */
+	for (new_region_net_pages = SB_REGION_INITSIZE;
+		 new_region_net_pages < sb_private_pages_allocated &&
+		 new_region_net_pages < SB_REGION_MAXSIZE; new_region_net_pages *= 2)
+		;
+	if (new_region_net_pages < npages)
+		new_region_net_pages = npages;
+
+	/* Try to allocate space from the operating system. */
+	for (;;)
+	{
+		/*
+		 * Compute space required for metadata and determine raw allocation
+		 * size.
+		 */
+		metadata_bytes = MAXALIGN(sizeof(sb_region));
+		metadata_bytes += MAXALIGN(sizeof(FreePageManager));
+		metadata_bytes += MAXALIGN(sb_map_size(NULL, new_region_net_pages));
+		if (metadata_bytes % FPM_PAGE_SIZE != 0)
+			metadata_bytes += FPM_PAGE_SIZE - (metadata_bytes % FPM_PAGE_SIZE);
+		region_size = new_region_net_pages * FPM_PAGE_SIZE + metadata_bytes;
+
+		/* Try to allocate memory. */
+		region_start = system_malloc(region_size);
+		if (region_start != NULL)
+			break;
+
+		/* Too big; if possible, loop and try a smaller allocation. */
+		if (new_region_net_pages == npages)
+			return NULL;
+		new_region_net_pages = Max(new_region_net_pages / 2, npages);
+	}
+
+	/*
+	 * Initialize region object.
+	 *
+	 * NB: We temporarily set region->contiguous_pages to a value one more
+	 * than the actual number.  This is because calling FreePageManagerPut
+	 * will provoke a callback to sb_report_contiguous_freespace, which we
+	 * want to exit quickly and, in particular, without deallocating the
+	 * region.
+	 */
+	region = (sb_region *) region_start;
+	region->region_start = region_start;
+	region->region_size = region_size;
+	region->usable_pages = new_region_net_pages;
+	sb_private_pages_allocated += region->usable_pages;
+	region->seg = NULL;
+	region->allocator = NULL;
+	region->fpm = (FreePageManager *)
+		(region_start + MAXALIGN(sizeof(sb_region)));
+	region->pagemap = (sb_map *)
+		(((char *) region->fpm) + MAXALIGN(sizeof(FreePageManager)));
+	region->contiguous_pages = new_region_net_pages + 1;
+
+	/* Initialize supporting data structures. */
+	FreePageManagerInitialize(region->fpm, region->region_start, NULL, false);
+	FreePageManagerPut(region->fpm, metadata_bytes / FPM_PAGE_SIZE,
+					   new_region_net_pages);
+	sb_map_initialize(region->pagemap, NULL, metadata_bytes / FPM_PAGE_SIZE,
+					  new_region_net_pages);
+	region->contiguous_pages = new_region_net_pages; /* Now fix the value. */
+	freelist = Min(fls(new_region_net_pages), NUM_PRIVATE_FREELISTS);
+	dlist_push_head(&private_freelist[freelist], &region->fl_node);
+	sb_adjust_lookup(region, true);
+
+	/* Time to rock and roll. */
+	return region;
+}
+
+/*
+ * When a free page manager detects that the maximum contiguous freespace in
+ * a backend-private region has increased, it calls this function.  Our job
+ * is to free the region completely if there are no remaining allocatons,
+ * and otherwise to 
+ */
+void
+sb_report_contiguous_freespace(sb_region *region, Size npages)
+{
+	int		old_freelist;
+	int		new_freelist;
+
+	/* This should only be called for private regions. */
+	Assert(region->seg == NULL);
+	Assert(region->allocator == NULL);
+
+	/*
+	 * If there have been allocations from the region since the last report,
+	 * it's possible that the number of pages reported is less than what we
+	 * already know about.  In that case, exit quickly; else update our
+	 * cached value.
+	 */
+	if (npages < region->contiguous_pages)
+		return;
+
+	/*
+	 * If the entire region is free, deallocate it.  The sb_region,
+	 * FreePageManager, and sb_map for the region is stored within it, so
+	 * they all go away when we free the managed space.
+	 */
+	if (npages == region->usable_pages)
+	{
+		char   *region_start = region->region_start;
+		Size	region_size = region->region_size;
+
+		/* Pull the region out of the lookup table. */
+		sb_adjust_lookup(region, false);
+
+		/* Remove the region object from the private freelist. */
+		dlist_delete(&region->fl_node);
+
+		/* Decrement count of private pages allocated. */
+		Assert(sb_private_pages_allocated >= region->usable_pages);
+		sb_private_pages_allocated -= region->usable_pages;
+
+		/* Return the managed space to the operating system. */
+		system_free(region_start, region_size);
+		return;
+	}
+
+	/* If necessary, move the region to a higher-numbered freelist. */
+	old_freelist = Min(fls(region->contiguous_pages), NUM_PRIVATE_FREELISTS);
+	new_freelist = Min(fls(npages), NUM_PRIVATE_FREELISTS);
+	if (new_freelist > old_freelist)
+	{
+		dlist_delete(&region->fl_node);
+		dlist_push_head(&private_freelist[new_freelist], &region->fl_node);
+	}
+
+	/* Record the reported value for future calls to this function. */
+	region->contiguous_pages = npages;
+}
+
+/*
+ * Insert a region into, or delete a region from, the address-based lookup
+ * tables.  Returns true on success and false if we fail due to memory
+ * exhaustion; delete always succeeds.
+ */
+static bool
+sb_adjust_lookup(sb_region *region, bool insert)
+{
+	bool	ok = true;
+
+	/*
+	 * If this is a 64-bit system, we need to loop over all of the relevant
+	 * tables and update each one.  On a 32-bit system, there's only one table
+	 * and we simply update that.
+	 */
+#if SIZEOF_SIZE_T > 4
+	Size	tabstart;
+	Size	tabstop;
+	Size	i;
+
+	tabstart = ((Size) region->region_start) >> 32;
+	tabstop = ((Size) region->region_start + region->region_size - 1) >> 32;
+
+	for (i = tabstart; i <= tabstop; ++i)
+	{
+		sb_lookup_leaf *leaf = sb_find_leaf(i, insert);
+
+		/*
+		 * Finding the leaf might fail if we're inserting and can't allocate
+		 * memory for a new lookup table.  Even if we get the leaf, inserting
+		 * the new region pointer into it might also fail for lack of memory.
+		 */
+		Assert(insert || leaf != NULL);
+		if (leaf == NULL)
+			ok = false;
+		else
+			ok = sb_adjust_lookup_leaf(leaf, region, insert);
+
+		if (!ok)
+		{
+			/* We ran out of memory; back out changes already made. */
+			ok = false;
+			tabstop = i - 1;
+			for (i = tabstart; i <= tabstop; ++i)
+				sb_adjust_lookup_leaf(sb_find_leaf(i, false), region, false);
+			break;
+		}
+	}
+#else
+	ok = sb_adjust_lookup_leaf(&lookup_root_leaf, region, insert);
+#endif
+
+	return ok;
+}
+
+/*
+ * Insert a region into, or remove a region from, a particular sb_lookup_leaf.
+ * Returns true on success and false if we fail due to memory exhaustion;
+ * delete always succeeds.
+ */
+static bool
+sb_adjust_lookup_leaf(sb_lookup_leaf *leaf, sb_region *region, bool insert)
+{
+	int		high, low;
+
+	/* If we're inserting, we might need to allocate more space. */
+	if (insert && leaf->nused >= leaf->nallocated)
+	{
+		Size	newsize;
+		sb_region **newtab;
+
+		newsize = leaf->nallocated == 0 ? 16 : leaf->nallocated * 2;
+		newtab = system_malloc(sizeof(sb_region *) * newsize);
+		if (newtab == NULL)
+			return false;
+		if (leaf->nused > 0)
+			memcpy(newtab, leaf->region, sizeof(sb_region *) * leaf->nused);
+		if (leaf->region != NULL)
+			system_free(leaf->region, sizeof(sb_region *) * leaf->nallocated);
+		leaf->nallocated = newsize;
+		leaf->region = newtab;
+	}
+
+	/* Use binary search on the sb_lookup_leaf. */
+	high = leaf->nused;
+	low = 0;
+	while (low < high)
+	{
+		int mid;
+		sb_region *candidate;
+
+		mid = (high + low) / 2;
+		candidate = leaf->region[mid];
+		if (candidate->region_start > region->region_start)
+			high = mid;
+		else if (candidate->region_start < region->region_start)
+			low = mid + 1;
+		else
+			low = high = mid;
+	}
+
+	/* Really do it. */
+	if (insert)
+	{
+		Assert(low == leaf->nused || 
+				leaf->region[low]->region_start > region->region_start);
+		if (low < leaf->nused)
+			memmove(&leaf->region[low + 1], &leaf->region[low],
+					sizeof(sb_region *) * (leaf->nused - low));
+		leaf->region[low] = region;
+		++leaf->nused;
+	}
+	else
+	{
+		Assert(leaf->region[low] == region);
+		if (low < leaf->nused - 1)
+			memmove(&leaf->region[low], &leaf->region[low + 1],
+					sizeof(sb_region *) * (leaf->nused - low - 1));
+		--leaf->nused;		
+	}
+
+	return true;
+}
+
+/*
+ * Dump debugging information for the regions covered by a single
+ * sb_lookup_leaf.  Skip the first one if it's the same as last_region.
+ */
+static void
+sb_dump_regions_leaf(sb_region *last_region, sb_lookup_leaf *leaf)
+{
+	int i;
+
+	for (i = 0; i < leaf->nused; ++i)
+	{
+		sb_region *region = leaf->region[i];
+
+		if (i == 0 && region == last_region)
+			continue;
+		fprintf(stderr, "== region at %p [%zu bytes, %zu usable pages] ==\n",
+				region->region_start, region->region_size,
+				region->usable_pages);
+		fprintf(stderr, "%s\n\n", FreePageManagerDump(region->fpm));
+	}
+}
+
+#if SIZEOF_SIZE_T > 4
+static sb_lookup_leaf *
+sb_find_leaf(Size highbits, bool insert)
+{
+	Size	rootbits;
+	sb_lookup_l2 *l2 = NULL;
+	sb_lookup_leaf **leafptr;
+	int	i;
+	int unused = -1;
+
+	rootbits = (highbits >> SB_LOOKUP_L2_BITS) & (SB_LOOKUP_ROOT_ENTRIES - 1);
+
+	/* Check for L2 entry in toplevel cache. */
+	for (i = 0; i < SB_LOOKUP_ROOT_CACHE_SIZE; ++i)
+	{
+		if (lookup_root.cache_value[i] == NULL)
+			unused = i;
+		else if (lookup_root.cache_key[i] == rootbits)
+			l2 = lookup_root.cache_value[i];
+	}
+
+	/* If no hit, check the full L2 loookup table, if it's been initialized. */
+	if (l2 == NULL && lookup_root.l2 != NULL)
+	{
+		rootbits &= SB_LOOKUP_ROOT_ENTRIES - 1;
+		l2 = lookup_root.l2[rootbits];
+
+	 	/* Pull entry into cache. */
+		if (l2 != NULL)
+		{
+			/*
+			 * No need to be smart about replacement policy; we expect to
+			 * arrive here virtually never.
+			 */
+			i = highbits % SB_LOOKUP_ROOT_CACHE_SIZE;
+			lookup_root.cache_key[i] = highbits;
+			lookup_root.cache_value[i] = l2;
+		}
+	}
+
+	/* If no L2 entry found, create one if inserting else give up. */
+	if (l2 == NULL)
+	{
+		if (!insert)
+			return NULL;
+		l2 = system_calloc(1, sizeof(sb_lookup_l2));
+		if (l2 == NULL)
+			return NULL;
+		if (unused != -1)
+		{
+			lookup_root.cache_key[unused] = rootbits;
+			lookup_root.cache_value[unused] = l2;
+		}
+		else if (lookup_root.l2 != NULL)
+			lookup_root.l2[rootbits] = l2;
+		else
+		{
+			lookup_root.l2 = system_calloc(SB_LOOKUP_ROOT_ENTRIES,
+									sizeof(sb_lookup_l2 *));
+			if (lookup_root.l2 == NULL)
+			{
+				system_free(l2, sizeof(sb_lookup_l2));
+				return NULL;
+			}
+			for (i = 0; i < SB_LOOKUP_ROOT_CACHE_SIZE; ++i)
+				lookup_root.l2[lookup_root.cache_key[i]] =
+					lookup_root.cache_value[i];
+		}
+	}
+
+	/* Find slot for entry, and try to initialize it if needed. */
+	leafptr = &l2->leaf[highbits & (SB_LOOKUP_L2_ENTRIES - 1)];
+	if (insert && *leafptr == NULL)
+		*leafptr = system_calloc(1, sizeof(sb_lookup_leaf));
+
+	return *leafptr;
+}
+#endif
+
+/*
+ * calloc() wrapper, to track bytes allocated.
+ */
+static void *
+system_calloc(Size count, Size s)
+{
+	void *p = calloc(count, s);
+
+	if (p != NULL)
+		sb_private_bytes_allocated += count * s;
+	return p;
+}
+
+/*
+ * free() wrapper, to track bytes allocated.
+ */
+static void
+system_free(void *p, Size s)
+{
+	free(p);
+	if (sb_private_bytes_allocated > sb_peak_private_bytes_allocated)
+		sb_peak_private_bytes_allocated = sb_private_bytes_allocated;
+	sb_private_bytes_allocated -= s;
+}
+
+/*
+ * malloc() wrapper, to track bytes allocated.
+ */
+static void *
+system_malloc(Size s)
+{
+	void *p = malloc(s);
+
+	if (p != NULL)
+		sb_private_bytes_allocated += s;
+	return p;
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9e209ae..1983d0f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -14,6 +14,7 @@
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
 #include "utils/relcache.h"
+#include "utils/sb_alloc.h"
 #include "utils/snapshot.h"
 #include "utils/timestamp.h"
 
@@ -326,31 +327,10 @@ struct ReorderBuffer
 	void	   *private_data;
 
 	/*
-	 * Private memory context.
+	 * Private memory context and allocator.
 	 */
 	MemoryContext context;
-
-	/*
-	 * Data structure slab cache.
-	 *
-	 * We allocate/deallocate some structures very frequently, to avoid bigger
-	 * overhead we cache some unused ones here.
-	 *
-	 * The maximum number of cached entries is controlled by const variables
-	 * on top of reorderbuffer.c
-	 */
-
-	/* cached ReorderBufferTXNs */
-	dlist_head	cached_transactions;
-	Size		nr_cached_transactions;
-
-	/* cached ReorderBufferChanges */
-	dlist_head	cached_changes;
-	Size		nr_cached_changes;
-
-	/* cached ReorderBufferTupleBufs */
-	slist_head	cached_tuplebufs;
-	Size		nr_cached_tuplebufs;
+	sb_allocator *allocator;
 
 	XLogRecPtr	current_restart_decoding_lsn;
 
diff --git a/src/include/utils/freepage.h b/src/include/utils/freepage.h
new file mode 100644
index 0000000..dd905d7
--- /dev/null
+++ b/src/include/utils/freepage.h
@@ -0,0 +1,101 @@
+/*-------------------------------------------------------------------------
+ *
+ * freepage.h
+ *	  Management of page-organized free memory.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/utils/freepage.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef FREEPAGE_H 
+#define FREEPAGE_H
+
+#include "storage/lwlock.h"
+#include "utils/relptr.h"
+
+/* Forward declarations. */
+typedef struct FreePageSpanLeader FreePageSpanLeader;
+typedef struct FreePageBtree FreePageBtree;
+typedef struct FreePageManager FreePageManager;
+
+/*
+ * PostgreSQL normally uses 8kB pages for most things, but many common
+ * architecture/operating system pairings use a 4kB page size for memory
+ * allocation, so we do that here also.  We assume that a large allocation
+ * is likely to begin on a page boundary; if not, we'll discard bytes from
+ * the beginning and end of the object and use only the middle portion that
+ * is properly aligned.  This works, but is not ideal, so it's best to keep
+ * this conservatively small.  There don't seem to be any common architectures
+ * where the page size is less than 4kB, so this should be good enough; also,
+ * making it smaller would increase the space consumed by the address space
+ * map, which also uses this page size.
+ */
+#define FPM_PAGE_SIZE			4096
+
+/*
+ * Each freelist except for the last contains only spans of one particular
+ * size.  Everything larger goes on the last one.  In some sense this seems
+ * like a waste since most allocations are in a few common sizes, but it
+ * means that small allocations can simply pop the head of the relevant list
+ * without needing to worry about whether the object we find there is of
+ * precisely the correct size (because we know it must be).
+ */
+#define FPM_NUM_FREELISTS		129
+
+/* Everything we need in order to manage free pages (see freepage.c) */
+struct FreePageManager
+{
+	relptr(FreePageManager)  self;
+	relptr(LWLock)  lock;
+	bool			lock_address_is_fixed;
+	relptr(FreePageBtree)   btree_root;
+	relptr(FreePageSpanLeader)	btree_recycle;
+	unsigned		btree_depth;
+	unsigned		btree_recycle_count;
+	Size			singleton_first_page;
+	Size			singleton_npages;
+	Size			largest_reported_chunk;
+	relptr(FreePageSpanLeader)  freelist[FPM_NUM_FREELISTS];
+};
+
+/* Macros to convert between page numbers (expressed as Size) and pointers. */
+#define fpm_page_to_pointer(base, page)	\
+	(AssertVariableIsOfTypeMacro(page, Size), \
+	 (base) + FPM_PAGE_SIZE * (page))
+#define fpm_pointer_to_page(base, ptr)		\
+	(((Size) (((char *) (ptr)) - (base))) / FPM_PAGE_SIZE)
+
+/* Macro to convert an allocation size to a number of pages. */
+#define fpm_size_to_pages(sz) \
+	(TYPEALIGN((sz), FPM_PAGE_SIZE))
+
+/* Macros to check alignment of absolute and relative pointers. */
+#define fpm_pointer_is_page_aligned(base, ptr)		\
+	(((Size) (((char *) (ptr)) - (base))) % FPM_PAGE_SIZE == 0)
+#define fpm_relptr_is_page_aligned(base, relptr)		\
+	((relptr).relptr_off % FPM_PAGE_SIZE == 0)
+
+/* Macro to find base address of the segment containing a FreePageManager. */
+#define fpm_segment_base(fpm)	\
+	(((char *) fpm) - fpm->self.relptr_off)
+
+/* Macro to find the lwlock for the FreePageManager. */
+#define fpm_lock(fpm) \
+	(relptr_access((fpm)->lock_address_is_fixed ? NULL : \
+		fpm_segment_base(fpm), (fpm)->lock))
+
+/* Functions to manipulate the free page map. */
+extern void FreePageManagerInitialize(FreePageManager *fpm, char *base,
+						  LWLock *lock, bool lock_address_is_fixed);
+extern bool FreePageManagerGet(FreePageManager *fpm, Size npages,
+						Size *first_page);
+extern void FreePageManagerPut(FreePageManager *fpm, Size first_page,
+						Size npages);
+extern Size FreePageManagerInquireLargest(FreePageManager *fpm);
+extern char *FreePageManagerDump(FreePageManager *fpm);
+
+#endif   /* FREEPAGE_H */
diff --git a/src/include/utils/relptr.h b/src/include/utils/relptr.h
new file mode 100644
index 0000000..46281cf
--- /dev/null
+++ b/src/include/utils/relptr.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * relptr.h
+ *	  This file contains basic declarations for relative pointers.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/utils/relptr.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef RELPTR_H
+#define RELPTR_H
+
+/*
+ * Relative pointers are intended to be used when storing an address that may
+ * be relative either to the base of the processes address space or some
+ * dynamic shared memory segment mapped therein.
+ *
+ * The idea here is that you declare a relative pointer as relptr(type)
+ * and then use relptr_access to dereference it and relptr_store to change
+ * it.  The use of a union here is a hack, because what's stored in the
+ * relptr is always a Size, never an actual pointer.  But including a pointer
+ * in the union allows us to use stupid macro tricks to provide some measure
+ * of type-safety.
+ */
+#define relptr(type)     union { type *relptr_type; Size relptr_off; }
+#define relptr_access(base, rp) \
+	(AssertVariableIsOfTypeMacro(base, char *), \
+	 (__typeof__((rp).relptr_type)) ((rp).relptr_off == 0 ? NULL : \
+		(base + (rp).relptr_off)))
+#define relptr_is_null(rp) \
+	((rp).relptr_off == 0)
+#define relptr_store(base, rp, val) \
+	(AssertVariableIsOfTypeMacro(base, char *), \
+	 AssertVariableIsOfTypeMacro(val, __typeof__((rp).relptr_type)), \
+	 (rp).relptr_off = ((val) == NULL ? 0 : ((char *) (val)) - (base)))
+#define relptr_copy(rp1, rp2) \
+	((rp1).relptr_off = (rp2).relptr_off)
+
+#endif   /* RELPTR_H */
diff --git a/src/include/utils/sb_alloc.h b/src/include/utils/sb_alloc.h
new file mode 100644
index 0000000..07b6a57
--- /dev/null
+++ b/src/include/utils/sb_alloc.h
@@ -0,0 +1,79 @@
+/*-------------------------------------------------------------------------
+ *
+ * sb_alloc.h
+ *	  Superblock-based memory allocator.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/utils/sb_alloc.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef SB_ALLOC_H
+#define SB_ALLOC_H
+
+#include "storage/lwlock.h"
+#include "utils/relptr.h"
+
+typedef struct sb_span sb_span;
+
+/*
+ * Superblocks are binned by how full they are.  Generally, each fullness
+ * class corresponds to one quartile, but the superblock being used for
+ * allocations is always at the head of the list for fullness class 1,
+ * regardless of how full it really is.
+ *
+ * For large objects, we just stick all of the allocations in fullness class
+ * 0. Since we can just return the space directly to the free page manager,
+ * we don't really need them on a list at all, except that if someone wants
+ * to bulk release everything allocated using this sb_allocator, we have no
+ * other way of finding them.
+ */
+#define SB_FULLNESS_CLASSES		4
+
+/*
+ * An sb_heap represents a set of allocations of a given size class.
+ * There can be multiple heaps for the same size class for contention
+ * avoidance.
+ */
+typedef struct sb_heap
+{
+	relptr(LWLock)	lock;
+	relptr(sb_span) spans[SB_FULLNESS_CLASSES];
+} sb_heap;
+
+/*
+ * An sb_allocator is basically just a group of heaps, one per size class.
+ * If locking is required, then we've also got an array of LWLocks, one per
+ * heap.
+ */
+typedef struct sb_allocator
+{
+	bool	private;
+	relptr(LWLock) locks;
+	sb_heap	heaps[FLEXIBLE_ARRAY_MEMBER];
+} sb_allocator;
+
+/* Pages per superblock (in units of FPM_PAGE_SIZE). */
+#define SB_PAGES_PER_SUPERBLOCK		16
+
+/* Allocation options. */
+#define SB_ALLOC_HUGE				0x0001		/* allow >=1GB */
+#define SB_ALLOC_SOFT_FAIL			0x0002		/* return NULL if no mem */
+
+/* Functions to manipulate allocators. */
+extern sb_allocator *sb_create_private_allocator(void);
+extern void sb_reset_allocator(sb_allocator *a);
+extern void sb_destroy_private_allocator(sb_allocator *a);
+
+/* Functions to allocate and free memory. */
+extern void *sb_alloc(sb_allocator *a, Size, int flags);
+extern void sb_free(void *ptr);
+
+/* Reporting functions. */
+extern Size sb_alloc_space(Size size);
+extern Size sb_chunk_space(void *ptr);
+
+#endif		/* SB_ALLOC_H */
diff --git a/src/include/utils/sb_map.h b/src/include/utils/sb_map.h
new file mode 100644
index 0000000..519bf52
--- /dev/null
+++ b/src/include/utils/sb_map.h
@@ -0,0 +1,24 @@
+/*-------------------------------------------------------------------------
+ *
+ * sb_map.h
+ *	  Superblock allocator page-mapping infrastructure.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/utils/sb_map.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef SB_MAP_H
+#define SB_MAP_H
+
+typedef struct sb_map sb_map;
+
+extern Size sb_map_size(char *base, Size npages);
+extern void sb_map_initialize(sb_map *, char *base, Size offset, Size npages);
+extern void sb_map_set(sb_map *, Size pageno, void *ptr);
+extern void *sb_map_get(sb_map *, Size pageno);
+
+#endif /* SB_MAP_H */
diff --git a/src/include/utils/sb_region.h b/src/include/utils/sb_region.h
new file mode 100644
index 0000000..5bb01f3
--- /dev/null
+++ b/src/include/utils/sb_region.h
@@ -0,0 +1,68 @@
+/*-------------------------------------------------------------------------
+ *
+ * sb_region.h
+ *	  Superblock allocator memory region manager.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/utils/sb_region.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef SB_REGION_H
+#define SB_REGION_H
+
+#include "lib/ilist.h"
+#include "storage/dsm.h"
+#include "storage/shm_toc.h"
+#include "utils/freepage.h"
+#include "utils/sb_alloc.h"
+#include "utils/sb_map.h"
+
+/*
+ * An sb_region is a backend-private object used to track allocatable regions
+ * of memory, either backend-private or shared.
+ */
+typedef struct sb_region
+{
+	char *region_start;			/* Address of region. */
+	Size region_size;			/* Number of bytes in region. */
+	Size usable_pages;			/* Number of usable pages in region. */
+	dsm_segment *seg;			/* If not backend-private, DSM handle. */
+	sb_allocator *allocator;	/* If not backend-private, shared allocator. */
+	FreePageManager *fpm;		/* Free page manager for region (if any). */
+	sb_map *pagemap;			/* Page map for region (if any). */
+	Size contiguous_pages;		/* Last reported contiguous free pages. */
+	dlist_node fl_node;			/* Freelist links. */
+} sb_region;
+
+/*
+ * An sb_shared_region is a shared-memory object containing the information
+ * necessary to set up an sb_region object for an individual backend.
+ */
+typedef struct sb_shared_region
+{
+	relptr(FreePageManager) fpm;
+	relptr(sb_map) pagemap;
+	relptr(sb_allocator) allocator;
+	int	lwlock_tranche_id;
+	char lwlock_tranche_name[FLEXIBLE_ARRAY_MEMBER];
+} sb_shared_region;
+
+/* Public API. */
+extern sb_shared_region *sb_create_shared_region(dsm_segment *seg,
+						shm_toc *toc, Size size,
+						int lwlock_tranche_id,
+						char *lwlock_tranche_name);
+extern sb_allocator *sb_attach_shared_region(dsm_segment *,
+						sb_shared_region *);
+extern void sb_dump_regions(void);
+
+/* For internal use by cooperating modules. */
+extern sb_region *sb_lookup_region(void *);
+extern sb_region *sb_private_region_for_allocator(Size npages);
+extern void sb_report_contiguous_freespace(sb_region *, Size npages);
+
+#endif		/* SB_REGION_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to