diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index fad5cb0..1d70fae 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1082,6 +1082,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry>Waiting for parallel query dynamic shared memory allocation lock.</entry>
         </row>
         <row>
+         <entry><literal>tbm</></entry>
+         <entry>Waiting for TBM shared iterator lock.</entry>
+        </row>
+        <row>
          <entry morerows="9"><literal>Lock</></entry>
          <entry><literal>relation</></entry>
          <entry>Waiting to acquire a lock on a relation.</entry>
diff --git a/src/backend/access/gin/ginget.c b/src/backend/access/gin/ginget.c
index 60f005c..87cd9ea 100644
--- a/src/backend/access/gin/ginget.c
+++ b/src/backend/access/gin/ginget.c
@@ -120,7 +120,7 @@ collectMatchBitmap(GinBtreeData *btree, GinBtreeStack *stack,
 	Form_pg_attribute attr;
 
 	/* Initialize empty bitmap result */
-	scanEntry->matchBitmap = tbm_create(work_mem * 1024L);
+	scanEntry->matchBitmap = tbm_create(work_mem * 1024L, NULL);
 
 	/* Null query cannot partial-match anything */
 	if (scanEntry->isPartialMatch &&
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index 4274e9a..94bb289 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -78,7 +78,7 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
 	else
 	{
 		/* XXX should we use less than work_mem for this? */
-		tbm = tbm_create(work_mem * 1024L);
+		tbm = tbm_create(work_mem * 1024L, NULL);
 	}
 
 	/*
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index 3c6155b..1d280be 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -129,7 +129,7 @@ MultiExecBitmapOr(BitmapOrState *node)
 			if (result == NULL) /* first subplan */
 			{
 				/* XXX should we use less than work_mem for this? */
-				result = tbm_create(work_mem * 1024L);
+				result = tbm_create(work_mem * 1024L, NULL);
 			}
 
 			((BitmapIndexScanState *) subnode)->biss_result = result;
diff --git a/src/backend/nodes/tidbitmap.c b/src/backend/nodes/tidbitmap.c
index 0885812..a1b2a45 100644
--- a/src/backend/nodes/tidbitmap.c
+++ b/src/backend/nodes/tidbitmap.c
@@ -43,6 +43,8 @@
 #include "access/htup_details.h"
 #include "nodes/bitmapset.h"
 #include "nodes/tidbitmap.h"
+#include "storage/lwlock.h"
+#include "utils/dsa.h"
 
 /*
  * The maximum number of tuples per page is not large (typically 256 with
@@ -103,6 +105,15 @@ typedef struct PagetableEntry
 } PagetableEntry;
 
 /*
+ * Holds array of pagetable entries.
+ */
+typedef struct PTEntryArray
+{
+	int			refcount;
+	PagetableEntry ptentry[FLEXIBLE_ARRAY_MEMBER];
+} PTEntryArray;
+
+/*
  * We want to avoid the overhead of creating the hashtable, which is
  * comparatively large, when not necessary. Particularly when we are using a
  * bitmap scan on the inside of a nestloop join: a bitmap may well live only
@@ -121,6 +132,16 @@ typedef enum
 } TBMStatus;
 
 /*
+ * Current iterating state of the TBM.
+ */
+typedef enum
+{
+	TBM_NOT_ITERATING,			/* not yet converted to page and chunk array */
+	TBM_ITERATING_PRIVATE,		/* converted to local page and chunk array */
+	TBM_ITERATING_SHARED		/* converted to shared page and chunk array */
+} TBMIteratingState;
+
+/*
  * Here is the representation for a whole TIDBitMap:
  */
 struct TIDBitmap
@@ -133,12 +154,17 @@ struct TIDBitmap
 	int			maxentries;		/* limit on same to meet maxbytes */
 	int			npages;			/* number of exact entries in pagetable */
 	int			nchunks;		/* number of lossy entries in pagetable */
-	bool		iterating;		/* tbm_begin_iterate called? */
+	TBMIteratingState iterating;	/* tbm_begin_iterate called? */
 	uint32		lossify_start;	/* offset to start lossifying hashtable at */
 	PagetableEntry entry1;		/* used when status == TBM_ONE_PAGE */
 	/* these are valid when iterating is true: */
 	PagetableEntry **spages;	/* sorted exact-page list, or NULL */
 	PagetableEntry **schunks;	/* sorted lossy-chunk list, or NULL */
+	dsa_pointer dsapagetable;	/* dsa_pointer to the element array */
+	dsa_pointer dsapagetableold;	/* dsa_pointer to the old element array */
+	dsa_pointer ptpages;		/* dsa_pointer to the page array */
+	dsa_pointer ptchunks;		/* dsa_pointer to the chunk array */
+	dsa_area   *dsa;			/* reference to per-query dsa area */
 };
 
 /*
@@ -156,6 +182,46 @@ struct TBMIterator
 	TBMIterateResult output;	/* MUST BE LAST (because variable-size) */
 };
 
+/*
+ * Holds the shared members of the iterator so that multiple processes
+ * can jointly iterate.
+ */
+typedef struct TBMSharedIteratorState
+{
+	int			nentries;		/* number of entries in pagetable */
+	int			maxentries;		/* limit on same to meet maxbytes */
+	int			npages;			/* number of exact entries in pagetable */
+	int			nchunks;		/* number of lossy entries in pagetable */
+	dsa_pointer pagetable;		/* dsa pointers to head of pagetable data */
+	dsa_pointer spages;			/* dsa pointer to page array */
+	dsa_pointer schunks;		/* dsa pointer to chunk array */
+	LWLock		lock;			/* lock to protect below members */
+	int			spageptr;		/* next spages index */
+	int			schunkptr;		/* next schunks index */
+	int			schunkbit;		/* next bit to check in current schunk */
+} TBMSharedIteratorState;
+
+/*
+ * pagetable iteration array.
+ */
+typedef struct PTIterationArray
+{
+	int			refcount;		/* no. of shared iterator referring */
+	int			index[FLEXIBLE_ARRAY_MEMBER];	/* index array */
+} PTIterationArray;
+
+/*
+ * same as TBMIterator, but it is used for joint iteration, therefore this
+ * also holds a reference to the shared state.
+ */
+struct TBMSharedIterator
+{
+	TBMSharedIteratorState *state;		/* shared state */
+	PTEntryArray *ptbase;		/* pagetable element array */
+	PTIterationArray *ptpages;	/* sorted exact page index list */
+	PTIterationArray *ptchunks; /* sorted lossy page index list */
+	TBMIterateResult output;	/* MUST BE LAST (because variable-size) */
+};
 
 /* Local function prototypes */
 static void tbm_union_page(TIDBitmap *a, const PagetableEntry *bpage);
@@ -168,6 +234,8 @@ static bool tbm_page_is_lossy(const TIDBitmap *tbm, BlockNumber pageno);
 static void tbm_mark_page_lossy(TIDBitmap *tbm, BlockNumber pageno);
 static void tbm_lossify(TIDBitmap *tbm);
 static int	tbm_comparator(const void *left, const void *right);
+static int tbm_shared_comparator(const void *left, const void *right,
+					  void *arg);
 
 /*
  * Simple inline murmur hash implementation for the exact width required, for
@@ -187,6 +255,7 @@ hash_blockno(BlockNumber b)
 }
 
 /* define hashtable mapping block numbers to PagetableEntry's */
+#define SH_USE_NONDEFAULT_ALLOCATOR
 #define SH_PREFIX pagetable
 #define SH_ELEMENT_TYPE PagetableEntry
 #define SH_KEY_TYPE BlockNumber
@@ -204,10 +273,12 @@ hash_blockno(BlockNumber b)
  *
  * The bitmap will live in the memory context that is CurrentMemoryContext
  * at the time of this call.  It will be limited to (approximately) maxbytes
- * total memory consumption.
+ * total memory consumption.  If the DSA passed to this function is not NULL
+ * then the memory for storing elements of the underlying page table will
+ * be allocated from the DSA.
  */
 TIDBitmap *
-tbm_create(long maxbytes)
+tbm_create(long maxbytes, dsa_area *dsa)
 {
 	TIDBitmap  *tbm;
 	long		nbuckets;
@@ -230,6 +301,7 @@ tbm_create(long maxbytes)
 	nbuckets = Max(nbuckets, 16);		/* sanity limit */
 	tbm->maxentries = (int) nbuckets;
 	tbm->lossify_start = 0;
+	tbm->dsa = dsa;
 
 	return tbm;
 }
@@ -244,7 +316,7 @@ tbm_create_pagetable(TIDBitmap *tbm)
 	Assert(tbm->status != TBM_HASH);
 	Assert(tbm->pagetable == NULL);
 
-	tbm->pagetable = pagetable_create(tbm->mcxt, 128, NULL);
+	tbm->pagetable = pagetable_create(tbm->mcxt, 128, tbm);
 
 	/* If entry1 is valid, push it into the hashtable */
 	if (tbm->status == TBM_ONE_PAGE)
@@ -281,6 +353,40 @@ tbm_free(TIDBitmap *tbm)
 }
 
 /*
+ * tbm_free_shared_area - free shared state
+ *
+ * Free shared iterator state, Also free shared pagetable and iterator arrays
+ * memory if they are not referred by any of the shared iterator i.e recount
+ * is becomes 0.
+ */
+void
+tbm_free_shared_area(dsa_area *dsa, dsa_pointer dp)
+{
+	TBMSharedIteratorState *istate = dsa_get_address(dsa, dp);
+	PTEntryArray *ptbase = dsa_get_address(dsa, istate->pagetable);
+	PTIterationArray *ptpages;
+	PTIterationArray *ptchunks;
+
+	if (--ptbase->refcount == 0)
+		dsa_free(dsa, istate->pagetable);
+
+	if (istate->spages)
+	{
+		ptpages = dsa_get_address(dsa, istate->spages);
+		if (--ptpages->refcount == 0)
+			dsa_free(dsa, istate->spages);
+	}
+	if (istate->schunks)
+	{
+		ptchunks = dsa_get_address(dsa, istate->schunks);
+		if (--ptchunks->refcount == 0)
+			dsa_free(dsa, istate->schunks);
+	}
+
+	dsa_free(dsa, dp);
+}
+
+/*
  * tbm_add_tuples - add some tuple IDs to a TIDBitmap
  *
  * If recheck is true, then the recheck flag will be set in the
@@ -294,7 +400,7 @@ tbm_add_tuples(TIDBitmap *tbm, const ItemPointer tids, int ntids,
 	PagetableEntry *page = NULL;	/* only valid when currblk is valid */
 	int			i;
 
-	Assert(!tbm->iterating);
+	Assert(tbm->iterating == TBM_NOT_ITERATING);
 	for (i = 0; i < ntids; i++)
 	{
 		BlockNumber blk = ItemPointerGetBlockNumber(tids + i);
@@ -603,6 +709,8 @@ tbm_begin_iterate(TIDBitmap *tbm)
 {
 	TBMIterator *iterator;
 
+	Assert(tbm->iterating != TBM_ITERATING_SHARED);
+
 	/*
 	 * Create the TBMIterator struct, with enough trailing space to serve the
 	 * needs of the TBMIterateResult sub-struct.
@@ -624,7 +732,7 @@ tbm_begin_iterate(TIDBitmap *tbm)
 	 * attached to the bitmap not the iterator, so they can be used by more
 	 * than one iterator.
 	 */
-	if (tbm->status == TBM_HASH && !tbm->iterating)
+	if (tbm->status == TBM_HASH && tbm->iterating == TBM_NOT_ITERATING)
 	{
 		pagetable_iterator i;
 		PagetableEntry *page;
@@ -659,12 +767,192 @@ tbm_begin_iterate(TIDBitmap *tbm)
 				  tbm_comparator);
 	}
 
-	tbm->iterating = true;
+	tbm->iterating = TBM_ITERATING_PRIVATE;
 
 	return iterator;
 }
 
 /*
+ * tbm_prepare_shared_iterate - prepare shared iteration state for a TIDBitmap.
+ *
+ * The necessary shared state will be allocated from the DSA passed to
+ * tbm_create, so that multiple processes can attach to it and iterate jointly.
+ *
+ * This will convert the pagetable hash into page and chunk array of the index
+ * into pagetable array.
+ */
+dsa_pointer
+tbm_prepare_shared_iterate(TIDBitmap *tbm)
+{
+	dsa_pointer dp;
+	TBMSharedIteratorState *istate;
+
+	Assert(tbm->dsa != NULL);
+	Assert(tbm->iterating != TBM_ITERATING_PRIVATE);
+
+	/*
+	 * Allocate TBMSharedIteratorState from DSA to hold the shared members and
+	 * lock, this will also be used by multiple worker for shared iterate.
+	 */
+	dp = dsa_allocate(tbm->dsa, sizeof(TBMSharedIteratorState));
+	istate = dsa_get_address(tbm->dsa, dp);
+
+	/*
+	 * If we're not already iterating, create and fill the sorted page lists.
+	 * (If we are, the sorted page lists are already stored in the TIDBitmap,
+	 * and we can just reuse them.)
+	 */
+	if (tbm->iterating == TBM_NOT_ITERATING)
+	{
+		pagetable_iterator i;
+		PagetableEntry *page;
+		PTEntryArray *ptbase;
+		PTIterationArray *ptpages;
+		PTIterationArray *ptchunks;
+		int			idx;
+		int			npages;
+		int			nchunks;
+
+		/*
+		 * Allocate the page and chunk array memory from the DSA to share
+		 * across multiple processes.
+		 */
+		if (tbm->npages)
+		{
+			tbm->ptpages = dsa_allocate(tbm->dsa, sizeof(PTIterationArray) +
+										tbm->npages * sizeof(int));
+			ptpages = dsa_get_address(tbm->dsa, tbm->ptpages);
+		}
+		if (tbm->nchunks)
+		{
+			tbm->ptchunks = dsa_allocate(tbm->dsa, sizeof(PTIterationArray) +
+										 tbm->nchunks * sizeof(int));
+			ptchunks = dsa_get_address(tbm->dsa, tbm->ptchunks);
+		}
+
+		/*
+		 * If TBM status is TBM_HASH then iterate over the pagetable and
+		 * convert it to page and chunk arrays.  But if it's in the
+		 * TBM_ONE_PAGE mode then directly allocate the space for one entry
+		 * from the DSA.
+		 */
+		npages = nchunks = 0;
+		if (tbm->status == TBM_HASH)
+		{
+			ptbase = dsa_get_address(tbm->dsa, tbm->dsapagetable);
+
+			pagetable_start_iterate(tbm->pagetable, &i);
+			while ((page = pagetable_iterate(tbm->pagetable, &i)) != NULL)
+			{
+				idx = page - ptbase->ptentry;
+				if (page->ischunk)
+					ptchunks->index[nchunks++] = idx;
+				else
+					ptpages->index[npages++] = idx;
+			}
+
+			Assert(npages == tbm->npages);
+			Assert(nchunks == tbm->nchunks);
+		}
+		else
+		{
+			/*
+			 * In one page mode allocate the space for one pagetable entry and
+			 * directly store its index i.e. 0 in page array
+			 */
+			tbm->dsapagetable = dsa_allocate(tbm->dsa, sizeof(PTEntryArray) +
+											 sizeof(PagetableEntry));
+			ptbase = dsa_get_address(tbm->dsa, tbm->dsapagetable);
+			ptpages->index[0] = 0;
+		}
+
+		if (npages > 1)
+			qsort_arg((void *) (ptpages->index), npages, sizeof(int),
+					  tbm_shared_comparator, (void *) ptbase->ptentry);
+		if (nchunks > 1)
+			qsort_arg((void *) (ptchunks->index), nchunks, sizeof(int),
+					  tbm_shared_comparator, (void *) ptbase->ptentry);
+	}
+
+	/*
+	 * Store the TBM members in the shared state so that we can share them
+	 * across multiple processes.
+	 */
+	istate->nentries = tbm->nentries;
+	istate->maxentries = tbm->maxentries;
+	istate->npages = tbm->npages;
+	istate->nchunks = tbm->nchunks;
+	istate->pagetable = tbm->dsapagetable;
+	istate->spages = tbm->ptpages;
+	istate->schunks = tbm->ptchunks;
+
+	/* Initialize the iterator lock */
+	LWLockInitialize(&istate->lock, LWTRANCHE_TBM);
+
+	/* Initialize the shared iterator state */
+	istate->schunkbit = 0;
+	istate->schunkptr = 0;
+	istate->spageptr = 0;
+
+	tbm->iterating = TBM_ITERATING_SHARED;
+
+	return dp;
+}
+
+/*
+ * tbm_extract_page_tuple - extract the tuple offsets from a page
+ *
+ * The extracted offsets are stored into TBMIterateResult.
+ */
+static inline int
+tbm_extract_page_tuple(PagetableEntry *page, TBMIterateResult *output)
+{
+	int			wordnum;
+	int			ntuples = 0;
+
+	for (wordnum = 0; wordnum < WORDS_PER_PAGE; wordnum++)
+	{
+		bitmapword	w = page->words[wordnum];
+
+		if (w != 0)
+		{
+			int			off = wordnum * BITS_PER_BITMAPWORD + 1;
+
+			while (w != 0)
+			{
+				if (w & 1)
+					output->offsets[ntuples++] = (OffsetNumber) off;
+				off++;
+				w >>= 1;
+			}
+		}
+	}
+
+	return ntuples;
+}
+
+/*
+ *	tbm_advance_schunkbit - Advance the chunkbit
+ */
+static inline void
+tbm_advance_schunkbit(PagetableEntry *chunk, int *schunkbitp)
+{
+	int			schunkbit = *schunkbitp;
+
+	while (schunkbit < PAGES_PER_CHUNK)
+	{
+		int			wordnum = WORDNUM(schunkbit);
+		int			bitnum = BITNUM(schunkbit);
+
+		if ((chunk->words[wordnum] & ((bitmapword) 1 << bitnum)) != 0)
+			break;
+		schunkbit++;
+	}
+
+	*schunkbitp = schunkbit;
+}
+
+/*
  * tbm_iterate - scan through next page of a TIDBitmap
  *
  * Returns a TBMIterateResult representing one page, or NULL if there are
@@ -682,7 +970,7 @@ tbm_iterate(TBMIterator *iterator)
 	TIDBitmap  *tbm = iterator->tbm;
 	TBMIterateResult *output = &(iterator->output);
 
-	Assert(tbm->iterating);
+	Assert(tbm->iterating == TBM_ITERATING_PRIVATE);
 
 	/*
 	 * If lossy chunk pages remain, make sure we've advanced schunkptr/
@@ -693,15 +981,7 @@ tbm_iterate(TBMIterator *iterator)
 		PagetableEntry *chunk = tbm->schunks[iterator->schunkptr];
 		int			schunkbit = iterator->schunkbit;
 
-		while (schunkbit < PAGES_PER_CHUNK)
-		{
-			int			wordnum = WORDNUM(schunkbit);
-			int			bitnum = BITNUM(schunkbit);
-
-			if ((chunk->words[wordnum] & ((bitmapword) 1 << bitnum)) != 0)
-				break;
-			schunkbit++;
-		}
+		tbm_advance_schunkbit(chunk, &schunkbit);
 		if (schunkbit < PAGES_PER_CHUNK)
 		{
 			iterator->schunkbit = schunkbit;
@@ -738,7 +1018,6 @@ tbm_iterate(TBMIterator *iterator)
 	{
 		PagetableEntry *page;
 		int			ntuples;
-		int			wordnum;
 
 		/* In ONE_PAGE state, we don't allocate an spages[] array */
 		if (tbm->status == TBM_ONE_PAGE)
@@ -747,24 +1026,7 @@ tbm_iterate(TBMIterator *iterator)
 			page = tbm->spages[iterator->spageptr];
 
 		/* scan bitmap to extract individual offset numbers */
-		ntuples = 0;
-		for (wordnum = 0; wordnum < WORDS_PER_PAGE; wordnum++)
-		{
-			bitmapword	w = page->words[wordnum];
-
-			if (w != 0)
-			{
-				int			off = wordnum * BITS_PER_BITMAPWORD + 1;
-
-				while (w != 0)
-				{
-					if (w & 1)
-						output->offsets[ntuples++] = (OffsetNumber) off;
-					off++;
-					w >>= 1;
-				}
-			}
-		}
+		ntuples = tbm_extract_page_tuple(page, output);
 		output->blockno = page->blockno;
 		output->ntuples = ntuples;
 		output->recheck = page->recheck;
@@ -777,6 +1039,94 @@ tbm_iterate(TBMIterator *iterator)
 }
 
 /*
+ *	tbm_shared_iterate - scan through next page of a TIDBitmap
+ *
+ *	As above, but this will iterate using an iterator which is shared
+ *	across multiple processes.  We need to acquire the iterator LWLock,
+ *	before accessing the shared members.
+ */
+TBMIterateResult *
+tbm_shared_iterate(TBMSharedIterator *iterator)
+{
+	TBMIterateResult *output = &iterator->output;
+	TBMSharedIteratorState *istate = iterator->state;
+	PagetableEntry *ptbase = iterator->ptbase->ptentry;
+	int		   *idxpages = iterator->ptpages->index;
+	int		   *idxchunks = iterator->ptchunks->index;
+
+	/* Acquire the LWLock before accessing the shared members */
+	LWLockAcquire(&istate->lock, LW_EXCLUSIVE);
+
+	/*
+	 * If lossy chunk pages remain, make sure we've advanced schunkptr/
+	 * schunkbit to the next set bit.
+	 */
+	while (istate->schunkptr < istate->nchunks)
+	{
+		PagetableEntry *chunk = &ptbase[idxchunks[istate->schunkptr]];
+		int			schunkbit = istate->schunkbit;
+
+		tbm_advance_schunkbit(chunk, &schunkbit);
+		if (schunkbit < PAGES_PER_CHUNK)
+		{
+			istate->schunkbit = schunkbit;
+			break;
+		}
+		/* advance to next chunk */
+		istate->schunkptr++;
+		istate->schunkbit = 0;
+	}
+
+	/*
+	 * If both chunk and per-page data remain, must output the numerically
+	 * earlier page.
+	 */
+	if (istate->schunkptr < istate->nchunks)
+	{
+		PagetableEntry *chunk = &ptbase[idxchunks[istate->schunkptr]];
+		PagetableEntry *page = &ptbase[idxpages[istate->spageptr]];
+		BlockNumber chunk_blockno;
+
+		chunk_blockno = chunk->blockno + istate->schunkbit;
+
+		if (istate->spageptr >= istate->npages ||
+			chunk_blockno < page->blockno)
+		{
+			/* Return a lossy page indicator from the chunk */
+			output->blockno = chunk_blockno;
+			output->ntuples = -1;
+			output->recheck = true;
+			istate->schunkbit++;
+
+			LWLockRelease(&istate->lock);
+			return output;
+		}
+	}
+
+	if (istate->spageptr < istate->npages)
+	{
+		PagetableEntry *page = &ptbase[idxpages[istate->spageptr]];
+		int			ntuples;
+
+		/* scan bitmap to extract individual offset numbers */
+		ntuples = tbm_extract_page_tuple(page, output);
+		output->blockno = page->blockno;
+		output->ntuples = ntuples;
+		output->recheck = page->recheck;
+		istate->spageptr++;
+
+		LWLockRelease(&istate->lock);
+
+		return output;
+	}
+
+	LWLockRelease(&istate->lock);
+
+	/* Nothing more in the bitmap */
+	return NULL;
+}
+
+/*
  * tbm_end_iterate - finish an iteration over a TIDBitmap
  *
  * Currently this is just a pfree, but it might do more someday.  (For
@@ -790,6 +1140,18 @@ tbm_end_iterate(TBMIterator *iterator)
 }
 
 /*
+ * tbm_end_shared_iterate - finish a shared iteration over a TIDBitmap
+ *
+ * This doesn't free any of the shared state associated with the iterator,
+ * just our backend-private state.
+ */
+void
+tbm_end_shared_iterate(TBMSharedIterator *iterator)
+{
+	pfree(iterator);
+}
+
+/*
  * tbm_find_pageentry - find a PagetableEntry for the pageno
  *
  * Returns NULL if there is no non-lossy entry for the pageno.
@@ -995,7 +1357,7 @@ tbm_lossify(TIDBitmap *tbm)
 	 * push nentries down to significantly less than maxentries, or else we'll
 	 * just end up doing this again very soon.  We shoot for maxentries/2.
 	 */
-	Assert(!tbm->iterating);
+	Assert(tbm->iterating == TBM_NOT_ITERATING);
 	Assert(tbm->status == TBM_HASH);
 
 	pagetable_start_iterate_at(tbm->pagetable, &i, tbm->lossify_start);
@@ -1061,3 +1423,111 @@ tbm_comparator(const void *left, const void *right)
 		return 1;
 	return 0;
 }
+
+/*
+ * As above, but this will get index into PagetableEntry array.  Therefore,
+ * it needs to get actual PagetableEntry using the index before comparing the
+ * blockno.
+ */
+static int
+tbm_shared_comparator(const void *left, const void *right, void *arg)
+{
+	PagetableEntry *base = (PagetableEntry *) arg;
+	PagetableEntry *lpage = &base[*(int *) left];
+	PagetableEntry *rpage = &base[*(int *) right];
+
+	if (lpage->blockno < rpage->blockno)
+		return -1;
+	else if (lpage->blockno > rpage->blockno)
+		return 1;
+	return 0;
+}
+
+/*
+ *	tbm_attach_shared_iterate
+ *
+ *	Allocate a backend-private iterator and attach the shared iterator state
+ *	to it so that multiple processed can iterate jointly.
+ *
+ *	We also converts the DSA pointers to local pointers and store them into
+ *	our private iterator.
+ */
+TBMSharedIterator *
+tbm_attach_shared_iterate(dsa_area *dsa, dsa_pointer dp)
+{
+	TBMSharedIterator *iterator;
+	TBMSharedIteratorState *istate;
+
+	/*
+	 * Create the TBMSharedIterator struct, with enough trailing space to
+	 * serve the needs of the TBMIterateResult sub-struct.
+	 */
+	iterator = (TBMSharedIterator *) palloc(sizeof(TBMSharedIterator) +
+								 MAX_TUPLES_PER_PAGE * sizeof(OffsetNumber));
+
+	istate = (TBMSharedIteratorState *) dsa_get_address(dsa, dp);
+
+	iterator->state = istate;
+
+	iterator->ptbase = dsa_get_address(dsa, istate->pagetable);
+	iterator->ptbase->refcount++;
+	if (istate->npages)
+	{
+		iterator->ptpages = dsa_get_address(dsa, istate->spages);
+		iterator->ptpages->refcount++;
+	}
+	if (istate->nchunks)
+	{
+		iterator->ptchunks = dsa_get_address(dsa, istate->schunks);
+		iterator->ptchunks->refcount++;
+	}
+
+	return iterator;
+}
+
+/*
+ * pagetable_allocate
+ *
+ * Callback function for allocating the memory for hashtable elements.
+ * Allocate memory for hashtable elements, using DSA if available.
+ */
+static inline void *
+pagetable_allocate(pagetable_hash *pagetable, Size size)
+{
+	TIDBitmap  *tbm = (TIDBitmap *) pagetable->private_data;
+	PTEntryArray *ptbase;
+
+	if (tbm->dsa == NULL)
+		return MemoryContextAllocExtended(pagetable->ctx, size,
+										  MCXT_ALLOC_HUGE | MCXT_ALLOC_ZERO);
+
+	/*
+	 * Save the dsapagetable reference in dsapagetableold before allocating
+	 * new memory so that pagetable_free can free the old entry.
+	 */
+	tbm->dsapagetableold = tbm->dsapagetable;
+	tbm->dsapagetable = dsa_allocate0(tbm->dsa, sizeof(PTEntryArray) + size);
+
+	ptbase = dsa_get_address(tbm->dsa, tbm->dsapagetable);
+	return ptbase->ptentry;
+}
+
+/*
+ * pagetable_free
+ *
+ * Callback function for freeing hash table elements.
+ */
+static inline void
+pagetable_free(pagetable_hash *pagetable, void *pointer)
+{
+	TIDBitmap  *tbm = (TIDBitmap *) pagetable->private_data;
+
+	/* pfree the input pointer if DSA is not available */
+	if (tbm->dsa == NULL)
+		pfree(pointer);
+	else if (DsaPointerIsValid(tbm->dsapagetableold))
+	{
+		dsa_free(tbm->dsa, tbm->dsapagetableold);
+		tbm->dsapagetableold = InvalidDsaPointer;
+	}
+}
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index ab81d94..3e13394 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -510,6 +510,7 @@ RegisterLWLockTranches(void)
 						  "predicate_lock_manager");
 	LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
 						  "parallel_query_dsa");
+	LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
 
 	/* Register named tranches. */
 	for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/include/nodes/tidbitmap.h b/src/include/nodes/tidbitmap.h
index 14992e0..87f4bb7 100644
--- a/src/include/nodes/tidbitmap.h
+++ b/src/include/nodes/tidbitmap.h
@@ -23,6 +23,7 @@
 #define TIDBITMAP_H
 
 #include "storage/itemptr.h"
+#include "utils/dsa.h"
 
 
 /*
@@ -33,6 +34,7 @@ typedef struct TIDBitmap TIDBitmap;
 
 /* Likewise, TBMIterator is private */
 typedef struct TBMIterator TBMIterator;
+typedef struct TBMSharedIterator TBMSharedIterator;
 
 /* Result structure for tbm_iterate */
 typedef struct
@@ -46,8 +48,9 @@ typedef struct
 
 /* function prototypes in nodes/tidbitmap.c */
 
-extern TIDBitmap *tbm_create(long maxbytes);
+extern TIDBitmap *tbm_create(long maxbytes, dsa_area *dsa);
 extern void tbm_free(TIDBitmap *tbm);
+extern void tbm_free_shared_area(dsa_area *dsa, dsa_pointer dp);
 
 extern void tbm_add_tuples(TIDBitmap *tbm,
 			   const ItemPointer tids, int ntids,
@@ -60,7 +63,12 @@ extern void tbm_intersect(TIDBitmap *a, const TIDBitmap *b);
 extern bool tbm_is_empty(const TIDBitmap *tbm);
 
 extern TBMIterator *tbm_begin_iterate(TIDBitmap *tbm);
+extern dsa_pointer tbm_prepare_shared_iterate(TIDBitmap *tbm);
 extern TBMIterateResult *tbm_iterate(TBMIterator *iterator);
+extern TBMIterateResult *tbm_shared_iterate(TBMSharedIterator *iterator);
 extern void tbm_end_iterate(TBMIterator *iterator);
+extern void tbm_end_shared_iterate(TBMSharedIterator *iterator);
+extern TBMSharedIterator *tbm_attach_shared_iterate(dsa_area *dsa,
+						  dsa_pointer dp);
 
 #endif   /* TIDBITMAP_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 8bd93c3..0cd45bb 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -212,6 +212,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_LOCK_MANAGER,
 	LWTRANCHE_PREDICATE_LOCK_MANAGER,
 	LWTRANCHE_PARALLEL_QUERY_DSA,
+	LWTRANCHE_TBM,
 	LWTRANCHE_FIRST_USER_DEFINED
 }	BuiltinTrancheIds;
 
