From 027b572d1134842d81b7a4b4310daabdbd75a2f2 Mon Sep 17 00:00:00 2001
From: bucoo <bucoo@sohu.com>
Date: Wed, 28 Oct 2020 16:27:43 +0800
Subject: [PATCH 3/4] extpand shared tuple store and add batch store module. By
 the way, use atomic operations instead LWLock for shared tuple store get next
 read page.

---
 src/backend/storage/lmgr/lwlock.c         |   2 -
 src/backend/utils/sort/Makefile           |   3 +-
 src/backend/utils/sort/batchstore.c       | 381 ++++++++++++++++++++++++++++++
 src/backend/utils/sort/sharedtuplestore.c | 112 +++++++--
 src/include/storage/lwlock.h              |   1 -
 src/include/utils/batchstore.h            |  38 +++
 src/include/utils/sharedtuplestore.h      |  12 +
 7 files changed, 526 insertions(+), 23 deletions(-)
 create mode 100644 src/backend/utils/sort/batchstore.c
 create mode 100644 src/include/utils/batchstore.h

diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 2fa90cc095..b11a91e94c 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -170,8 +170,6 @@ static const char *const BuiltinTrancheNames[] = {
 	"PerSessionRecordType",
 	/* LWTRANCHE_PER_SESSION_RECORD_TYPMOD: */
 	"PerSessionRecordTypmod",
-	/* LWTRANCHE_SHARED_TUPLESTORE: */
-	"SharedTupleStore",
 	/* LWTRANCHE_SHARED_TIDBITMAP: */
 	"SharedTidBitmap",
 	/* LWTRANCHE_PARALLEL_APPEND: */
diff --git a/src/backend/utils/sort/Makefile b/src/backend/utils/sort/Makefile
index 7ac3659261..f82f5aa8cd 100644
--- a/src/backend/utils/sort/Makefile
+++ b/src/backend/utils/sort/Makefile
@@ -19,7 +19,8 @@ OBJS = \
 	sharedtuplestore.o \
 	sortsupport.o \
 	tuplesort.o \
-	tuplestore.o
+	tuplestore.o \
+	batchstore.o
 
 tuplesort.o: qsort_tuple.c
 
diff --git a/src/backend/utils/sort/batchstore.c b/src/backend/utils/sort/batchstore.c
new file mode 100644
index 0000000000..3cca7769c9
--- /dev/null
+++ b/src/backend/utils/sort/batchstore.c
@@ -0,0 +1,381 @@
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/parallel.h"
+#include "commands/tablespace.h"
+#include "executor/nodeHash.h"
+#include "port/atomics.h"
+#include "storage/buffile.h"
+#include "utils/batchstore.h"
+#include "utils/memutils.h"
+#include "utils/sharedtuplestore.h"
+
+#define InvalidBatch UINT32_MAX
+
+typedef enum BatchMethod
+{
+	BSM_HASH = 1,
+	BSM_PARALLEL_HASH
+}BatchMethod;
+
+typedef struct BatchStoreParallelHashData
+{
+	pg_atomic_uint32	cur_batches;
+	uint32				num_batches;
+	uint32				num_participants;
+}BatchStoreParallelHashData;
+
+typedef struct BatchStoreData
+{
+	BatchStoreFuncs	func;
+	BatchMethod	method;
+	uint32		num_batches;
+	void	  **all_batches;
+	void	   *cur_batch_ptr;
+	uint32		cur_batch_num;
+	union
+	{
+		/* for hash */
+		struct
+		{
+			StringInfoData	hash_read_buf;
+		};
+		/* for parallel hash */
+		struct
+		{
+			dsm_segment		   *dsm_seg;
+			MemoryContext		accessor_mcontext;
+			Bitmapset		   *our_batches;		/* we got batches(for rescan) */
+			pg_atomic_uint32   *shm_ph_batch_num;	/* in shared memory, parallel hash batch number */
+			bool				ended_parallel;		/* parallel batches loop end? for rescan. */
+			bool				parallel_batch;		/* each worker read part of all batches? */
+		};
+	};
+}BatchStoreData;
+
+static void bs_write_normal_hash(BatchStore bs, MinimalTuple mtup, uint32 hash);
+static MinimalTuple bs_read_normal_hash(BatchStore bs, uint32 *hash);
+
+static void bs_write_parallel_hash(BatchStore bs, MinimalTuple mtup, uint32 hash);
+static void bs_write_parallel_one_batch_hash(BatchStore bs, MinimalTuple mtup, uint32 hash);
+static MinimalTuple bs_read_parallel_hash(BatchStore bs, uint32 *hash);
+
+static inline BatchStore make_empty_batch_store(uint32 num_batches)
+{
+	BatchStore bs;
+
+	bs = palloc0(MAXALIGN(sizeof(BatchStoreData)) +
+					sizeof(void*) * num_batches);
+	bs->all_batches = (void**)(((char*)bs) + MAXALIGN(sizeof(*bs)));
+	bs->num_batches = num_batches;
+	bs->cur_batch_num = InvalidBatch;
+
+	return bs;
+}
+
+BatchStore bs_begin_hash(uint32 num_batches)
+{
+	BatchStore bs = make_empty_batch_store(num_batches);
+	bs->method = BSM_HASH;
+
+	initStringInfo(&bs->hash_read_buf);
+	enlargeStringInfo(&bs->hash_read_buf, MINIMAL_TUPLE_DATA_OFFSET);
+	MemSet(bs->hash_read_buf.data, 0, MINIMAL_TUPLE_DATA_OFFSET);
+
+	PrepareTempTablespaces();
+
+	bs->func.hash_write = bs_write_normal_hash;
+	bs->func.hash_read = bs_read_normal_hash;
+	return bs;
+}
+
+size_t bs_parallel_hash_estimate(uint32 num_batches, uint32 nparticipants)
+{
+	return MAXALIGN(sizeof(struct BatchStoreParallelHashData)) + 
+				MAXALIGN(sts_estimate(nparticipants)) * num_batches;
+}
+
+static BatchStore bs_begin_parallel_hash(BatchStoreParallelHash bsph,
+										 uint32 my_participant_num, bool init,
+										 SharedFileSet *fileset, const char *name,
+										 dsm_segment *dsm_seg)
+{
+	uint32			i;
+	MemoryContext	oldcontext;
+	char		   *addr;
+	char			buffer[24];
+	Size			sts_size = MAXALIGN(sts_estimate(bsph->num_participants));
+	BatchStore		bs = make_empty_batch_store(bsph->num_batches);
+
+	bs->method = BSM_PARALLEL_HASH;
+	bs->shm_ph_batch_num = &bsph->cur_batches;
+
+	bs->accessor_mcontext = AllocSetContextCreate(CurrentMemoryContext,
+												  "batch parallel hash",
+												  ALLOCSET_DEFAULT_SIZES);
+	oldcontext = MemoryContextSwitchTo(bs->accessor_mcontext);
+	addr = ((char*)bsph) + MAXALIGN(sizeof(*bsph));
+	for (i=bsph->num_batches;i>0;)
+	{
+		--i;
+		if (init)
+		{
+			sprintf(buffer, "%s_%u", name, i);
+			bs->all_batches[i] = sts_initialize((SharedTuplestore*)addr,
+												bsph->num_participants,
+												my_participant_num,
+												sizeof(uint32),
+												0,
+												fileset,
+												buffer);
+		}else
+		{
+			bs->all_batches[i] = sts_attach((SharedTuplestore*)addr,
+											my_participant_num,
+											fileset);
+		}
+		addr += sts_size;
+	}
+	MemoryContextSwitchTo(oldcontext);
+
+	bs->dsm_seg = dsm_seg;
+	bs->func.hash_read = bs_read_parallel_hash;
+	if (bs->num_batches == 1)
+		bs->func.hash_write = bs_write_parallel_one_batch_hash;
+	else
+		bs->func.hash_write = bs_write_parallel_hash;
+
+	return bs;
+}
+
+BatchStore bs_init_parallel_hash(uint32 num_batches,
+								 uint32 nparticipants, uint32 my_participant_num,
+								 BatchStoreParallelHash bsph, dsm_segment *dsm_seg,
+								 SharedFileSet *fileset, const char *name)
+{
+	Assert(name != NULL && fileset != NULL);
+	bsph->num_batches = num_batches;
+	bsph->num_participants = nparticipants;
+	pg_atomic_init_u32(&bsph->cur_batches, InvalidBatch);
+
+	return bs_begin_parallel_hash(bsph, my_participant_num, true, fileset, name, dsm_seg);
+}
+
+BatchStore bs_attach_parallel_hash(BatchStoreParallelHash bsph, dsm_segment *dsm_seg,
+								   SharedFileSet *fileset, uint32 my_participant_num)
+{
+	return bs_begin_parallel_hash(bsph, my_participant_num, false, fileset, NULL, dsm_seg);
+}
+
+void bs_destory(BatchStore bs)
+{
+	uint32	i;
+	if (bs == NULL)
+		return;
+
+	switch(bs->method)
+	{
+	case BSM_HASH:
+		for(i=0;i<bs->num_batches;++i)
+		{
+			if (bs->all_batches[i])
+				BufFileClose(bs->all_batches[i]);
+		}
+		pfree(bs->hash_read_buf.data);
+		break;
+	case BSM_PARALLEL_HASH:
+		{
+			BatchStoreParallelHash bsph = (BatchStoreParallelHash)(((char*)bs->shm_ph_batch_num) -
+											offsetof(BatchStoreParallelHashData, cur_batches));
+			uint32 count = bsph->num_batches;
+			while (count > 0)
+				sts_detach(bs->all_batches[--count]);
+			MemoryContextDelete(bs->accessor_mcontext);
+		}
+		break;
+	default:
+		ereport(ERROR,
+				(errmsg("unknown batch store method %u", bs->method)));
+		break;
+	}
+
+	pfree(bs);
+}
+
+static void bs_write_normal_hash(BatchStore bs, MinimalTuple mtup, uint32 hash)
+{
+	uint32 batch = hash % bs->num_batches;
+	uint32 data_len = mtup->t_len - MINIMAL_TUPLE_DATA_OFFSET;
+	BufFile *buffile = bs->all_batches[batch];
+
+	if (unlikely(buffile == NULL))
+	{
+		MemoryContext oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(bs));
+		buffile = BufFileCreateTemp(false);
+		bs->all_batches[batch] = buffile;
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	BufFileWrite(buffile, &hash, sizeof(hash));
+	BufFileWrite(buffile, &mtup->t_len, sizeof(mtup->t_len));
+	BufFileWrite(buffile, ((char*)mtup) + MINIMAL_TUPLE_DATA_OFFSET, data_len);
+}
+
+static MinimalTuple bs_read_normal_hash(BatchStore bs, uint32 *hash)
+{
+	MinimalTuple	mtup;
+	size_t			nread;
+	uint32			head[2];
+	uint32			data_len;
+
+	nread = BufFileRead(bs->cur_batch_ptr, head, sizeof(head));
+	if (nread == 0)
+		return NULL;
+
+	if (nread != sizeof(head))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from batch store temporary file: %m")));
+	*hash = head[0];
+	enlargeStringInfo(&bs->hash_read_buf, head[1]);
+	mtup = (MinimalTuple)bs->hash_read_buf.data;
+	mtup->t_len = head[1];
+	data_len = head[1] - MINIMAL_TUPLE_DATA_OFFSET;
+	if (BufFileRead(bs->cur_batch_ptr, ((char*)mtup) + MINIMAL_TUPLE_DATA_OFFSET, data_len) != data_len)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from batch store temporary file: %m")));
+
+	return mtup;
+}
+
+void bs_end_write(BatchStore bs)
+{
+	uint32 i;
+	switch(bs->method)
+	{
+	case BSM_HASH:
+		/* nothing to do */
+		break;
+	case BSM_PARALLEL_HASH:
+		for (i=bs->num_batches;i>0;)
+			sts_end_write(bs->all_batches[--i]);
+		bs->cur_batch_ptr = NULL;
+		break;
+	default:
+		ereport(ERROR,
+				(errmsg("unknown batch store method %u", bs->method)));
+		break;
+	}
+}
+
+static void bs_write_parallel_hash(BatchStore bs, MinimalTuple mtup, uint32 hash)
+{
+	sts_puttuple(bs->all_batches[hash%bs->num_batches],
+				 &hash,
+				 mtup);
+}
+
+static void bs_write_parallel_one_batch_hash(BatchStore bs, MinimalTuple mtup, uint32 hash)
+{
+	Assert(bs->num_batches == 1);
+	sts_puttuple(bs->all_batches[0],
+				 &hash,
+				 mtup);
+}
+
+static MinimalTuple bs_read_parallel_hash(BatchStore bs, uint32 *hash)
+{
+	return sts_scan_next(bs->cur_batch_ptr, hash);
+}
+
+bool bs_next_batch(BatchStore bs, bool no_parallel)
+{
+	uint32 batch;
+	switch(bs->method)
+	{
+	case BSM_HASH:
+
+		batch = bs->cur_batch_num;
+		++batch;
+
+		for (;batch < bs->num_batches;++batch)
+		{
+			if (bs->all_batches[batch])
+			{
+				bs->cur_batch_ptr = bs->all_batches[batch];
+				bs->cur_batch_num = batch;
+				if (BufFileSeek(bs->cur_batch_ptr, 0, 0, SEEK_SET) != 0)
+				{
+					ereport(ERROR,
+							(errcode_for_file_access(),
+							 errmsg("can not seek batch store file to head")));
+				}
+				return true;
+			}
+		}
+		break;
+	case BSM_PARALLEL_HASH:
+		if (no_parallel)
+		{
+			batch = bs->cur_batch_num;
+			++batch;
+		}else
+		{
+			batch = pg_atomic_add_fetch_u32(bs->shm_ph_batch_num, 1);
+		}
+
+		if (batch < bs->num_batches)
+		{
+			bs->cur_batch_num = batch;
+			bs->cur_batch_ptr = bs->all_batches[batch];
+			sts_begin_scan(bs->cur_batch_ptr);
+			return true;
+		}
+		break;
+	default:
+		ereport(ERROR,
+				(errmsg("unknown batch store method %u", bs->method)));
+		break;
+	}
+
+	return false;
+}
+
+void bs_rescan(BatchStore bs)
+{
+	switch(bs->method)
+	{
+	case BSM_HASH:
+		bs->cur_batch_ptr = NULL;
+		bs->cur_batch_num = InvalidBatch;
+		break;
+	case BSM_PARALLEL_HASH:
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("parallel batch store not support rescan yet")));
+		break;
+	default:
+		ereport(ERROR,
+				(errmsg("unknown batch store method %u", bs->method)));
+		break;
+	}
+}
+
+void bs_end_cur_batch(BatchStore bs)
+{
+	switch(bs->method)
+	{
+	case BSM_HASH:
+		bs->cur_batch_ptr = NULL;
+		break;
+	case BSM_PARALLEL_HASH:
+		sts_end_scan(bs->cur_batch_ptr);
+		bs->cur_batch_ptr = NULL;
+		break;
+	default:
+		ereport(ERROR,
+				(errmsg("unknown batch store method %u", bs->method)));
+		break;
+	}
+}
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
index fe298ce92e..fbad3098e6 100644
--- a/src/backend/utils/sort/sharedtuplestore.c
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -24,9 +24,10 @@
 #include "access/htup.h"
 #include "access/htup_details.h"
 #include "miscadmin.h"
+#include "port/atomics.h"
 #include "storage/buffile.h"
-#include "storage/lwlock.h"
 #include "storage/sharedfileset.h"
+#include "utils/memutils.h"
 #include "utils/sharedtuplestore.h"
 
 /*
@@ -50,8 +51,7 @@ typedef struct SharedTuplestoreChunk
 /* Per-participant shared state. */
 typedef struct SharedTuplestoreParticipant
 {
-	LWLock		lock;
-	BlockNumber read_page;		/* Page number for next read. */
+	pg_atomic_uint32 read_page;	/* Page number for next read. */
 	BlockNumber npages;			/* Number of pages written. */
 	bool		writing;		/* Used only for assertions. */
 } SharedTuplestoreParticipant;
@@ -72,6 +72,8 @@ struct SharedTuplestore
 struct SharedTuplestoreAccessor
 {
 	int			participant;	/* My participant number. */
+	bool		is_read_only;	/* is read only attach? */
+	bool		is_normal_scan;	/* is not parallel scan? */
 	SharedTuplestore *sts;		/* The shared state. */
 	SharedFileSet *fileset;		/* The SharedFileSet holding files. */
 	MemoryContext context;		/* Memory context for buffers. */
@@ -155,9 +157,8 @@ sts_initialize(SharedTuplestore *sts, int participants,
 
 	for (i = 0; i < participants; ++i)
 	{
-		LWLockInitialize(&sts->participants[i].lock,
-						 LWTRANCHE_SHARED_TUPLESTORE);
-		sts->participants[i].read_page = 0;
+		pg_atomic_init_u32(&sts->participants[i].read_page, 0);
+		sts->participants[i].npages = 0;
 		sts->participants[i].writing = false;
 	}
 
@@ -192,6 +193,24 @@ sts_attach(SharedTuplestore *sts,
 	return accessor;
 }
 
+SharedTuplestoreAccessor *
+sts_attach_read_only(SharedTuplestore *sts,
+					 SharedFileSet *fileset)
+{
+	SharedTuplestoreAccessor *accessor;
+
+	Assert(sts->nparticipants > 0);
+
+	accessor = palloc0(sizeof(SharedTuplestoreAccessor));
+	accessor->is_read_only = true;
+	accessor->participant = 0;
+	accessor->sts = sts;
+	accessor->fileset = fileset;
+	accessor->context = CurrentMemoryContext;
+
+	return accessor;
+}
+
 static void
 sts_flush_chunk(SharedTuplestoreAccessor *accessor)
 {
@@ -242,7 +261,7 @@ sts_reinitialize(SharedTuplestoreAccessor *accessor)
 	 */
 	for (i = 0; i < accessor->sts->nparticipants; ++i)
 	{
-		accessor->sts->participants[i].read_page = 0;
+		pg_atomic_init_u32(&accessor->sts->participants[i].read_page, 0);
 	}
 }
 
@@ -272,6 +291,8 @@ sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
 	accessor->read_participant = accessor->participant;
 	accessor->read_file = NULL;
 	accessor->read_next_page = 0;
+	accessor->read_ntuples = 0;
+	accessor->read_ntuples_available = 0;
 }
 
 /*
@@ -302,15 +323,23 @@ sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
 {
 	size_t		size;
 
+	if (unlikely(accessor->is_read_only))
+	{
+		ereport(ERROR,
+				(errmsg("shard tuplestore is attached read only")));
+	}
+
 	/* Do we have our own file yet? */
 	if (accessor->write_file == NULL)
 	{
 		SharedTuplestoreParticipant *participant;
-		char		name[MAXPGPATH];
+		char			name[MAXPGPATH];
+		MemoryContext	oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(accessor));
 
 		/* Create one.  Only this backend will write into it. */
 		sts_filename(name, accessor, accessor->participant);
 		accessor->write_file = BufFileCreateShared(accessor->fileset, name);
+		MemoryContextSwitchTo(oldcontext);
 
 		/* Set up the shared state for this backend's file. */
 		participant = &accessor->sts->participants[accessor->participant];
@@ -532,20 +561,36 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
 		/* Find the location of a new chunk to read. */
 		p = &accessor->sts->participants[accessor->read_participant];
 
-		LWLockAcquire(&p->lock, LW_EXCLUSIVE);
-		/* We can skip directly past overflow pages we know about. */
-		if (p->read_page < accessor->read_next_page)
-			p->read_page = accessor->read_next_page;
-		eof = p->read_page >= p->npages;
-		if (!eof)
+		if (accessor->is_normal_scan)
+		{
+			eof = accessor->read_next_page >= p->npages;
+			if (!eof)
+			{
+				read_page = accessor->read_next_page;
+				accessor->read_next_page += STS_CHUNK_PAGES;
+			}
+		}else
 		{
 			/* Claim the next chunk. */
-			read_page = p->read_page;
-			/* Advance the read head for the next reader. */
-			p->read_page += STS_CHUNK_PAGES;
-			accessor->read_next_page = p->read_page;
+			read_page = pg_atomic_read_u32(&p->read_page);
+			/* We can skip directly past overflow pages we know about. */
+			while (read_page < accessor->read_next_page)
+			{
+				if (pg_atomic_compare_exchange_u32(&p->read_page,
+												   &read_page,
+												   accessor->read_next_page))
+					break;
+			}
+			while ((eof = read_page >= p->npages) == false)
+			{
+				/* Advance the read head for the next reader. */
+				accessor->read_next_page = read_page + STS_CHUNK_PAGES;
+				if (pg_atomic_compare_exchange_u32(&p->read_page,
+												   &read_page,
+												   accessor->read_next_page))
+					break;
+			}
 		}
-		LWLockRelease(&p->lock);
 
 		if (!eof)
 		{
@@ -556,10 +601,12 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
 			if (accessor->read_file == NULL)
 			{
 				char		name[MAXPGPATH];
+				MemoryContext oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(accessor));
 
 				sts_filename(name, accessor, accessor->read_participant);
 				accessor->read_file =
 					BufFileOpenShared(accessor->fileset, name, O_RDONLY);
+				MemoryContextSwitchTo(oldcontext);
 			}
 
 			/* Seek and load the chunk header. */
@@ -626,3 +673,30 @@ sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
 {
 	snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
 }
+
+void sts_begin_scan(SharedTuplestoreAccessor *accessor)
+{
+	sts_begin_parallel_scan(accessor);
+	accessor->is_normal_scan = true;
+}
+
+void sts_end_scan(SharedTuplestoreAccessor *accessor)
+{
+	Assert(accessor->is_normal_scan);
+	sts_end_parallel_scan(accessor);
+	accessor->is_normal_scan = false;
+}
+
+MinimalTuple sts_scan_next(SharedTuplestoreAccessor *accessor,
+					   void *meta_data)
+{
+	Assert(accessor->is_normal_scan);
+	return sts_parallel_scan_next(accessor, meta_data);
+}
+
+void sts_detach(SharedTuplestoreAccessor *accessor)
+{
+	sts_end_write(accessor);
+	sts_end_parallel_scan(accessor);
+	pfree(accessor);
+}
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index af9b41795d..8a13397379 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -214,7 +214,6 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_PER_SESSION_DSA,
 	LWTRANCHE_PER_SESSION_RECORD_TYPE,
 	LWTRANCHE_PER_SESSION_RECORD_TYPMOD,
-	LWTRANCHE_SHARED_TUPLESTORE,
 	LWTRANCHE_SHARED_TIDBITMAP,
 	LWTRANCHE_PARALLEL_APPEND,
 	LWTRANCHE_PER_XACT_PREDICATE_LIST,
diff --git a/src/include/utils/batchstore.h b/src/include/utils/batchstore.h
new file mode 100644
index 0000000000..1afbaea570
--- /dev/null
+++ b/src/include/utils/batchstore.h
@@ -0,0 +1,38 @@
+#ifndef BATCH_STORE_H
+#define BATCH_STORE_H
+
+#include "access/htup.h"
+#include "storage/dsm.h"
+#include "storage/sharedfileset.h"
+
+typedef struct BatchStoreData* BatchStore;
+typedef struct BatchStoreParallelHashData* BatchStoreParallelHash;
+
+typedef struct BatchStoreFuncs
+{
+	void (*hash_write)(BatchStore bs, MinimalTuple mtup, uint32 hash);
+	MinimalTuple (*hash_read)(BatchStore bs, uint32 *hash);
+}BatchStoreFuncs;
+
+#define bs_write_hash(bs, mtup, hash) (*((BatchStoreFuncs*)bs)->hash_write)(bs, mtup, hash)
+#define bs_read_hash(bs, phash) (*((BatchStoreFuncs*)bs)->hash_read)(bs, phash)
+
+extern BatchStore bs_begin_hash(uint32 num_batches);
+
+extern size_t bs_parallel_hash_estimate(uint32 num_batches, uint32 nparticipants);
+extern BatchStore bs_init_parallel_hash(uint32 num_batches,
+										uint32 nparticipants, uint32 my_participant_num,
+										BatchStoreParallelHash bsph, dsm_segment *dsm_seg,
+										SharedFileSet *fileset, const char *name);
+extern BatchStore bs_attach_parallel_hash(BatchStoreParallelHash bsph, dsm_segment *dsm_seg,
+										  SharedFileSet *fileset, uint32 my_participant_num);
+
+extern void bs_destory(BatchStore bs);
+
+
+extern void bs_end_write(BatchStore bs);
+
+extern bool bs_next_batch(BatchStore bs, bool no_parallel);
+extern void bs_rescan(BatchStore bs);
+extern void bs_end_cur_batch(BatchStore bs);
+#endif /* BATCH_STORE_H */
\ No newline at end of file
diff --git a/src/include/utils/sharedtuplestore.h b/src/include/utils/sharedtuplestore.h
index 9754504cc5..e8121fbe78 100644
--- a/src/include/utils/sharedtuplestore.h
+++ b/src/include/utils/sharedtuplestore.h
@@ -43,6 +43,9 @@ extern SharedTuplestoreAccessor *sts_attach(SharedTuplestore *sts,
 											int my_participant_number,
 											SharedFileSet *fileset);
 
+extern SharedTuplestoreAccessor *sts_attach_read_only(SharedTuplestore *sts,
+													  SharedFileSet *fileset);
+
 extern void sts_end_write(SharedTuplestoreAccessor *accessor);
 
 extern void sts_reinitialize(SharedTuplestoreAccessor *accessor);
@@ -58,4 +61,13 @@ extern void sts_puttuple(SharedTuplestoreAccessor *accessor,
 extern MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor,
 										   void *meta_data);
 
+extern void sts_begin_scan(SharedTuplestoreAccessor *accessor);
+
+extern void sts_end_scan(SharedTuplestoreAccessor *accessor);
+
+extern MinimalTuple sts_scan_next(SharedTuplestoreAccessor *accessor,
+								  void *meta_data);
+
+extern void sts_detach(SharedTuplestoreAccessor *accessor);
+
 #endif							/* SHAREDTUPLESTORE_H */
-- 
2.16.3

