From 54ba12269794d7c714c7396da6a41fb47a442b06 Mon Sep 17 00:00:00 2001
From: bucoo <bucoo@sohu.com>
Date: Wed, 28 Oct 2020 16:29:06 +0800
Subject: [PATCH 4/4] Parallel distinct union aggregate and grouping sets
 support using batch hash aggregate

---
 src/backend/commands/explain.c                    |   4 +
 src/backend/executor/nodeAgg.c                    | 543 ++++++++++++++++++----
 src/backend/nodes/copyfuncs.c                     |   1 +
 src/backend/nodes/outfuncs.c                      |   1 +
 src/backend/nodes/readfuncs.c                     |   1 +
 src/backend/optimizer/path/costsize.c             |  31 +-
 src/backend/optimizer/plan/createplan.c           |  12 +-
 src/backend/optimizer/plan/planner.c              | 131 ++++++
 src/backend/optimizer/prep/prepunion.c            |  27 ++
 src/backend/postmaster/pgstat.c                   |   3 +
 src/backend/utils/misc/guc.c                      |   9 +
 src/include/executor/nodeAgg.h                    |   2 +
 src/include/nodes/execnodes.h                     |   3 +
 src/include/nodes/nodes.h                         |   3 +-
 src/include/nodes/plannodes.h                     |   1 +
 src/include/optimizer/cost.h                      |   1 +
 src/include/optimizer/pathnode.h                  |   2 +
 src/include/pgstat.h                              |   3 +-
 src/test/regress/expected/groupingsets.out        |  65 +++
 src/test/regress/expected/partition_aggregate.out |  64 +++
 src/test/regress/expected/select_distinct.out     |  35 ++
 src/test/regress/expected/sysviews.out            |   3 +-
 src/test/regress/expected/union.out               |  30 ++
 src/test/regress/sql/groupingsets.sql             |  10 +
 src/test/regress/sql/partition_aggregate.sql      |  20 +
 src/test/regress/sql/select_distinct.sql          |   9 +
 src/test/regress/sql/union.sql                    |  13 +
 27 files changed, 941 insertions(+), 86 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 16a1fb035d..d4b336aa82 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1302,6 +1302,10 @@ ExplainNode(PlanState *planstate, List *ancestors,
 						pname = "MixedAggregate";
 						strategy = "Mixed";
 						break;
+					case AGG_BATCH_HASH:
+						pname = "BatchHashAggregate";
+						strategy = "BatchHashed";
+						break;
 					default:
 						pname = "Aggregate ???";
 						strategy = "???";
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 75e5bbf209..08024532a3 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -256,7 +256,10 @@
 #include "optimizer/optimizer.h"
 #include "parser/parse_agg.h"
 #include "parser/parse_coerce.h"
+#include "pgstat.h"
+#include "storage/barrier.h"
 #include "utils/acl.h"
+#include "utils/batchstore.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
 #include "utils/dynahash.h"
@@ -311,6 +314,11 @@
  */
 #define CHUNKHDRSZ 16
 
+#define SHARED_AGG_MAGIC		UINT64CONST(0x4141bbcd61518e52)
+#define SHARED_AGG_KEY_INFO		UINT64CONST(0xD000000000000001)
+#define SHARED_AGG_KEY_BARRIER	UINT64CONST(0xD000000000000002)
+#define SHARED_AGG_KEY_FILE_SET	UINT64CONST(0xD000000000000003)
+
 /*
  * Track all tapes needed for a HashAgg that spills. We don't know the maximum
  * number of tapes needed at the start of the algorithm (because it can
@@ -446,7 +454,7 @@ static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
 									   int input_tapenum, int setno,
 									   int64 input_tuples, double input_card,
 									   int used_bits);
-static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
+static bool hashagg_batch_read(void *userdata, TupleTableSlot *slot, uint32 *hashp);
 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
 							   int used_bits, double input_groups,
 							   double hashentrysize);
@@ -473,6 +481,13 @@ static int	find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
 									 Oid aggserialfn, Oid aggdeserialfn,
 									 Datum initValue, bool initValueIsNull,
 									 List *transnos);
+static TupleTableSlot *ExecBatchHashAggPrepare(PlanState *pstate);
+static TupleTableSlot *ExecBatchHashAgg(PlanState *pstate);
+static bool ExecBatchHashAggNextBatch(AggState *node);
+#if 0
+static void agg_batch_fill_hash_table(AggState *aggstate);
+static void ClearBatchAgg(AggState *node);
+#endif
 
 
 /*
@@ -1338,7 +1353,8 @@ finalize_aggregates(AggState *aggstate,
 		if (pertrans->numSortCols > 0)
 		{
 			Assert(aggstate->aggstrategy != AGG_HASHED &&
-				   aggstate->aggstrategy != AGG_MIXED);
+				   aggstate->aggstrategy != AGG_MIXED &&
+				   aggstate->aggstrategy != AGG_BATCH_HASH);
 
 			if (pertrans->numInputs == 1)
 				process_ordered_aggregate_single(aggstate,
@@ -1510,8 +1526,10 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets)
 	MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
 	MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
 	Size		additionalsize;
+	bool		use_hash_iv;
 
 	Assert(aggstate->aggstrategy == AGG_HASHED ||
+		   aggstate->aggstrategy == AGG_BATCH_HASH ||
 		   aggstate->aggstrategy == AGG_MIXED);
 
 	/*
@@ -1521,6 +1539,10 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets)
 	 * tuple of each group.
 	 */
 	additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
+	if (aggstate->aggstrategy == AGG_BATCH_HASH)
+		use_hash_iv = false;
+	else
+		use_hash_iv = DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit);
 
 	perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
 												perhash->hashslot->tts_tupleDescriptor,
@@ -1534,7 +1556,7 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets)
 												metacxt,
 												hashcxt,
 												tmpcxt,
-												DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
+												use_hash_iv);
 }
 
 /*
@@ -1633,10 +1655,15 @@ find_hash_columns(AggState *aggstate)
 			palloc(maxCols * sizeof(AttrNumber));
 		perhash->hashGrpColIdxHash =
 			palloc(perhash->numCols * sizeof(AttrNumber));
+		perhash->colnos_needed = bms_copy(aggregated_colnos);
 
 		/* Add all the grouping columns to colnos */
 		for (i = 0; i < perhash->numCols; i++)
+		{
 			colnos = bms_add_member(colnos, grpColIdx[i]);
+			perhash->colnos_needed = bms_add_member(perhash->colnos_needed,
+													grpColIdx[i]);
+		}
 
 		/*
 		 * First build mapping for columns directly hashed. These are the
@@ -1746,9 +1773,11 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
 	int			j = nullcheck ? 1 : 0;
 
 	Assert(aggstate->aggstrategy == AGG_HASHED ||
+		   aggstate->aggstrategy == AGG_BATCH_HASH ||
 		   aggstate->aggstrategy == AGG_MIXED);
 
-	if (aggstate->aggstrategy == AGG_HASHED)
+	if (aggstate->aggstrategy == AGG_HASHED ||
+		aggstate->aggstrategy == AGG_BATCH_HASH)
 		phase = &aggstate->phases[0];
 	else						/* AGG_MIXED */
 		phase = &aggstate->phases[1];
@@ -2170,6 +2199,9 @@ ExecAgg(PlanState *pstate)
 			case AGG_SORTED:
 				result = agg_retrieve_direct(node);
 				break;
+			case AGG_BATCH_HASH:
+				elog(ERROR, "batch hash should not run in function ExecAgg");
+				break;
 		}
 
 		if (!TupIsNull(result))
@@ -2570,35 +2602,20 @@ agg_fill_hash_table(AggState *aggstate)
 						   &aggstate->perhash[0].hashiter);
 }
 
-/*
- * If any data was spilled during hash aggregation, reset the hash table and
- * reprocess one batch of spilled data. After reprocessing a batch, the hash
- * table will again contain data, ready to be consumed by
- * agg_retrieve_hash_table_in_memory().
- *
- * Should only be called after all in memory hash table entries have been
- * finalized and emitted.
- *
- * Return false when input is exhausted and there's no more work to be done;
- * otherwise return true.
- */
-static bool
-agg_refill_hash_table(AggState *aggstate)
+static void
+agg_refill_hash_table_ex(AggState *aggstate,
+						 bool (*read_tup)(void *userdata, TupleTableSlot *slot, uint32 *hash),
+						 void *userdata,
+						 int used_bits,
+						 double input_groups,
+						 int setno)
 {
-	HashAggBatch *batch;
 	AggStatePerHash perhash;
 	HashAggSpill spill;
-	HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
 	bool		spill_initialized = false;
 
-	if (aggstate->hash_batches == NIL)
-		return false;
-
-	batch = linitial(aggstate->hash_batches);
-	aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
-
-	hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
-						batch->used_bits, &aggstate->hash_mem_limit,
+	hash_agg_set_limits(aggstate->hashentrysize, input_groups,
+						used_bits, &aggstate->hash_mem_limit,
 						&aggstate->hash_ngroups_limit, NULL);
 
 	/* there could be residual pergroup pointers; clear them */
@@ -2626,7 +2643,7 @@ agg_refill_hash_table(AggState *aggstate)
 		aggstate->phase = &aggstate->phases[aggstate->current_phase];
 	}
 
-	select_current_set(aggstate, batch->setno, true);
+	select_current_set(aggstate, setno, true);
 
 	perhash = &aggstate->perhash[aggstate->current_set];
 
@@ -2644,31 +2661,27 @@ agg_refill_hash_table(AggState *aggstate)
 		TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
 		TupleTableSlot *hashslot = perhash->hashslot;
 		TupleHashEntry entry;
-		MinimalTuple tuple;
 		uint32		hash;
 		bool		isnew = false;
 		bool	   *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
 
 		CHECK_FOR_INTERRUPTS();
 
-		tuple = hashagg_batch_read(batch, &hash);
-		if (tuple == NULL)
+		if ((*read_tup)(userdata, spillslot, &hash) == false)
 			break;
 
-		ExecStoreMinimalTuple(tuple, spillslot, true);
 		aggstate->tmpcontext->ecxt_outertuple = spillslot;
 
 		prepare_hash_slot(perhash,
-						  aggstate->tmpcontext->ecxt_outertuple,
+						  spillslot,
 						  hashslot);
-		entry = LookupTupleHashEntryHash(
-										 perhash->hashtable, hashslot, p_isnew, hash);
+		entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, p_isnew, hash);
 
 		if (entry != NULL)
 		{
 			if (isnew)
 				initialize_hash_entry(aggstate, perhash->hashtable, entry);
-			aggstate->hash_pergroup[batch->setno] = entry->additional;
+			aggstate->hash_pergroup[setno] = entry->additional;
 			advance_aggregates(aggstate);
 		}
 		else
@@ -2680,13 +2693,13 @@ agg_refill_hash_table(AggState *aggstate)
 				 * that we don't assign tapes that will never be used.
 				 */
 				spill_initialized = true;
-				hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
-								   batch->input_card, aggstate->hashentrysize);
+				hashagg_spill_init(&spill, aggstate->hash_tapeinfo, used_bits,
+								   input_groups, aggstate->hashentrysize);
 			}
 			/* no memory for a new group, spill */
 			hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
 
-			aggstate->hash_pergroup[batch->setno] = NULL;
+			aggstate->hash_pergroup[setno] = NULL;
 		}
 
 		/*
@@ -2696,15 +2709,13 @@ agg_refill_hash_table(AggState *aggstate)
 		ResetExprContext(aggstate->tmpcontext);
 	}
 
-	hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
-
 	/* change back to phase 0 */
 	aggstate->current_phase = 0;
 	aggstate->phase = &aggstate->phases[aggstate->current_phase];
 
 	if (spill_initialized)
 	{
-		hashagg_spill_finish(aggstate, &spill, batch->setno);
+		hashagg_spill_finish(aggstate, &spill, setno);
 		hash_agg_update_metrics(aggstate, true, spill.npartitions);
 	}
 	else
@@ -2713,9 +2724,43 @@ agg_refill_hash_table(AggState *aggstate)
 	aggstate->hash_spill_mode = false;
 
 	/* prepare to walk the first hash table */
-	select_current_set(aggstate, batch->setno, true);
-	ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
-						   &aggstate->perhash[batch->setno].hashiter);
+	select_current_set(aggstate, setno, true);
+	ResetTupleHashIterator(aggstate->perhash[setno].hashtable,
+						   &aggstate->perhash[setno].hashiter);
+}
+
+/*
+ * If any data was spilled during hash aggregation, reset the hash table and
+ * reprocess one batch of spilled data. After reprocessing a batch, the hash
+ * table will again contain data, ready to be consumed by
+ * agg_retrieve_hash_table_in_memory().
+ *
+ * Should only be called after all in memory hash table entries have been
+ * finalized and emitted.
+ *
+ * Return false when input is exhausted and there's no more work to be done;
+ * otherwise return true.
+ */
+static bool
+agg_refill_hash_table(AggState *aggstate)
+{
+	HashAggBatch *batch;
+
+	if (aggstate->hash_batches == NIL)
+		return false;
+
+	batch = linitial(aggstate->hash_batches);
+	aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
+
+	agg_refill_hash_table_ex(aggstate,
+							 hashagg_batch_read,
+							 batch,
+							 batch->used_bits,
+							 batch->input_card,
+							 batch->setno);
+
+	hashagg_tapeinfo_release(aggstate->hash_tapeinfo,
+							 batch->input_tapenum);
 
 	pfree(batch);
 
@@ -3056,9 +3101,10 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
  * read_spilled_tuple
  * 		read the next tuple from a batch's tape.  Return NULL if no more.
  */
-static MinimalTuple
-hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
+static bool
+hashagg_batch_read(void *userdata, TupleTableSlot *slot, uint32 *hashp)
 {
+	HashAggBatch *batch = userdata;
 	LogicalTapeSet *tapeset = batch->tapeset;
 	int			tapenum = batch->input_tapenum;
 	MinimalTuple tuple;
@@ -3068,7 +3114,7 @@ hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
 
 	nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
 	if (nread == 0)
-		return NULL;
+		return false;
 	if (nread != sizeof(uint32))
 		ereport(ERROR,
 				(errcode_for_file_access(),
@@ -3096,7 +3142,8 @@ hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
 				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
 						tapenum, t_len - sizeof(uint32), nread)));
 
-	return tuple;
+	ExecStoreMinimalTuple(tuple, slot, true);
+	return true;
 }
 
 /*
@@ -3257,6 +3304,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	int			i = 0;
 	int			j = 0;
 	bool		use_hashing = (node->aggstrategy == AGG_HASHED ||
+							   node->aggstrategy == AGG_BATCH_HASH ||
 							   node->aggstrategy == AGG_MIXED);
 
 	/* check for unsupported flags */
@@ -3268,7 +3316,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	aggstate = makeNode(AggState);
 	aggstate->ss.ps.plan = (Plan *) node;
 	aggstate->ss.ps.state = estate;
-	aggstate->ss.ps.ExecProcNode = ExecAgg;
+	if (node->aggstrategy == AGG_BATCH_HASH)
+		aggstate->ss.ps.ExecProcNode = ExecBatchHashAggPrepare;
+	else
+		aggstate->ss.ps.ExecProcNode = ExecAgg;
 
 	aggstate->aggs = NIL;
 	aggstate->numaggs = 0;
@@ -3315,7 +3366,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 			 * additional AGG_HASHED aggs become part of phase 0, but all
 			 * others add an extra phase.
 			 */
-			if (agg->aggstrategy != AGG_HASHED)
+			if (agg->aggstrategy != AGG_HASHED &&
+				agg->aggstrategy != AGG_BATCH_HASH)
 				++numPhases;
 			else
 				++numHashes;
@@ -3362,7 +3414,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	 * If we are doing a hashed aggregation then the child plan does not need
 	 * to handle REWIND efficiently; see ExecReScanAgg.
 	 */
-	if (node->aggstrategy == AGG_HASHED)
+	if (node->aggstrategy == AGG_HASHED ||
+		node->aggstrategy == AGG_BATCH_HASH)
 		eflags &= ~EXEC_FLAG_REWIND;
 	outerPlan = outerPlan(node);
 	outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
@@ -3370,9 +3423,16 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	/*
 	 * initialize source tuple type.
 	 */
-	aggstate->ss.ps.outerops =
-		ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
-							 &aggstate->ss.ps.outeropsfixed);
+	if (node->aggstrategy == AGG_BATCH_HASH)
+	{
+		aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
+		aggstate->ss.ps.outeropsfixed = true;
+	}else
+	{
+		aggstate->ss.ps.outerops =
+			ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
+								 &aggstate->ss.ps.outeropsfixed);
+	}
 	aggstate->ss.ps.outeropsset = true;
 
 	ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
@@ -3470,6 +3530,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 		Assert(phase <= 1 || sortnode);
 
 		if (aggnode->aggstrategy == AGG_HASHED
+			|| aggnode->aggstrategy == AGG_BATCH_HASH
 			|| aggnode->aggstrategy == AGG_MIXED)
 		{
 			AggStatePerPhase phasedata = &aggstate->phases[0];
@@ -3678,7 +3739,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	 * hashing is being done too, then phase 0 is processed last); but if only
 	 * hashing is being done, then phase 0 is all there is.
 	 */
-	if (node->aggstrategy == AGG_HASHED)
+	if (node->aggstrategy == AGG_HASHED ||
+		node->aggstrategy == AGG_BATCH_HASH)
 	{
 		aggstate->current_phase = 0;
 		initialize_phase(aggstate, 0);
@@ -4066,7 +4128,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 			dohash = false;
 			dosort = true;
 		}
-		else if (phase->aggstrategy == AGG_HASHED)
+		else if (phase->aggstrategy == AGG_HASHED ||
+				 phase->aggstrategy == AGG_BATCH_HASH)
 		{
 			dohash = true;
 			dosort = false;
@@ -4666,6 +4729,14 @@ ExecReScanAgg(AggState *node)
 			return;
 		}
 	}
+	else if (node->aggstrategy == AGG_BATCH_HASH)
+	{
+		if (!node->batch_filled)
+			return;
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("batch hash not support rescan yet!")));
+	}
 
 	/* Make sure we have closed any open tuplesorts */
 	for (transno = 0; transno < node->numtrans; transno++)
@@ -4958,6 +5029,54 @@ aggregate_dummy(PG_FUNCTION_ARGS)
  * ----------------------------------------------------------------
  */
 
+static Size ExecAggEstimateToc(AggState *node, ParallelContext *pcxt)
+{
+	Size				size;
+	shm_toc_estimator	estimator;
+	ListCell		   *lc;
+
+	/* don't need this if no workers */
+	if (pcxt->nworkers == 0)
+		return 0;
+	/* don't need this if not instrumenting and not batch hash agg */
+	if (!node->ss.ps.instrument &&
+		node->aggstrategy != AGG_BATCH_HASH)
+		return 0;
+
+	shm_toc_initialize_estimator(&estimator);
+	if (node->ss.ps.instrument)
+	{
+		size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
+		size = add_size(size, offsetof(SharedAggInfo, sinstrument));
+		shm_toc_estimate_chunk(&estimator, size);
+		shm_toc_estimate_keys(&estimator, 1);
+	}
+
+	if (node->aggstrategy == AGG_BATCH_HASH)
+	{
+		int nparticipants = pcxt->nworkers + 1;
+		shm_toc_estimate_chunk(&estimator, sizeof(Barrier));
+		shm_toc_estimate_chunk(&estimator, sizeof(SharedFileSet));
+		shm_toc_estimate_keys(&estimator, 2);
+
+		size = bs_parallel_hash_estimate(castNode(Agg, node->ss.ps.plan)->numBatches,
+										 nparticipants);
+		shm_toc_estimate_chunk(&estimator, size);
+		shm_toc_estimate_keys(&estimator, 1);
+
+		foreach (lc, castNode(Agg, node->ss.ps.plan)->chain)
+		{
+			Agg *agg = lfirst_node(Agg, lc);
+			Assert(agg->aggstrategy == AGG_BATCH_HASH);
+			size = bs_parallel_hash_estimate(agg->numBatches, nparticipants);
+			shm_toc_estimate_chunk(&estimator, size);
+			shm_toc_estimate_keys(&estimator, 1);
+		}
+	}
+
+	return shm_toc_estimate(&estimator);
+}
+
  /* ----------------------------------------------------------------
   *		ExecAggEstimate
   *
@@ -4967,14 +5086,7 @@ aggregate_dummy(PG_FUNCTION_ARGS)
 void
 ExecAggEstimate(AggState *node, ParallelContext *pcxt)
 {
-	Size		size;
-
-	/* don't need this if not instrumenting or no workers */
-	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
-		return;
-
-	size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
-	size = add_size(size, offsetof(SharedAggInfo, sinstrument));
+	Size size = ExecAggEstimateToc(node, pcxt);
 	shm_toc_estimate_chunk(&pcxt->estimator, size);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 }
@@ -4989,19 +5101,77 @@ void
 ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
 {
 	Size		size;
+	shm_toc	   *toc;
+	void	   *addr;
 
-	/* don't need this if not instrumenting or no workers */
-	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+	size = ExecAggEstimateToc(node, pcxt);
+	if (size == 0)
 		return;
 
-	size = offsetof(SharedAggInfo, sinstrument)
-		+ pcxt->nworkers * sizeof(AggregateInstrumentation);
-	node->shared_info = shm_toc_allocate(pcxt->toc, size);
-	/* ensure any unfilled slots will contain zeroes */
-	memset(node->shared_info, 0, size);
-	node->shared_info->num_workers = pcxt->nworkers;
-	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
-				   node->shared_info);
+	addr = shm_toc_allocate(pcxt->toc, size);
+	toc = shm_toc_create(SHARED_AGG_MAGIC, addr, size);
+	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, addr);
+
+	if (node->ss.ps.instrument)
+	{
+		size = offsetof(SharedAggInfo, sinstrument)
+			+ pcxt->nworkers * sizeof(AggregateInstrumentation);
+		node->shared_info = shm_toc_allocate(toc, size);
+		/* ensure any unfilled slots will contain zeroes */
+		memset(node->shared_info, 0, size);
+		node->shared_info->num_workers = pcxt->nworkers;
+		shm_toc_insert(toc, SHARED_AGG_KEY_INFO, node->shared_info);
+	}
+
+	if (node->aggstrategy == AGG_BATCH_HASH)
+	{
+		int				nparticipants = pcxt->nworkers + 1;
+		int				i = 0;
+		ListCell	   *lc;
+		Agg			   *agg;
+		SharedFileSet  *fset = shm_toc_allocate(toc, sizeof(SharedFileSet));
+		SharedFileSetInit(fset, pcxt->seg);
+		shm_toc_insert(toc, SHARED_AGG_KEY_FILE_SET, fset);
+
+		node->batch_barrier = shm_toc_allocate(toc, sizeof(Barrier));
+		BarrierInit(node->batch_barrier, 0);
+		shm_toc_insert(toc, SHARED_AGG_KEY_BARRIER, node->batch_barrier);
+
+		agg = castNode(Agg, node->ss.ps.plan);
+		Assert(agg->numBatches > 0);
+		size = bs_parallel_hash_estimate(agg->numBatches, nparticipants);
+		addr = shm_toc_allocate(toc, size);
+		shm_toc_insert(toc, 0, addr);
+		node->perhash[0].batch_store = bs_init_parallel_hash(agg->numBatches,
+															 nparticipants,
+															 0,
+															 addr,
+															 pcxt->seg,
+															 fset,
+															 "BatchHashAgg");
+
+		i = 1;
+		foreach (lc, agg->chain)
+		{
+			Agg	   *subagg = lfirst_node(Agg, lc);
+			char	name[30];
+			Assert(subagg->aggstrategy == AGG_BATCH_HASH &&
+				   subagg->numBatches > 0);
+			Assert(i < node->num_hashes);
+			size = bs_parallel_hash_estimate(subagg->numBatches, nparticipants);
+			addr = shm_toc_allocate(toc, size);
+			shm_toc_insert(toc, i, addr);
+			sprintf(name, "BatchHashAgg%d", i);
+			node->perhash[i].batch_store = bs_init_parallel_hash(subagg->numBatches,
+																 nparticipants,
+																 0,
+																 addr,
+																 pcxt->seg,
+																 fset,
+																 name);
+			++i;
+		}
+	}
 }
 
 /* ----------------------------------------------------------------
@@ -5013,8 +5183,43 @@ ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
 void
 ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
 {
-	node->shared_info =
-		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+	shm_toc	   *toc;
+	void	   *addr = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+	if (addr == NULL)
+	{
+		Assert(node->aggstrategy != AGG_BATCH_HASH);
+		return;
+	}
+	toc = shm_toc_attach(SHARED_AGG_MAGIC, addr);
+	node->shared_info = shm_toc_lookup(toc, SHARED_AGG_KEY_INFO, true);
+
+	if (node->aggstrategy == AGG_BATCH_HASH)
+	{
+		int				i;
+		ListCell	   *lc;
+		Agg			   *agg = castNode(Agg, node->ss.ps.plan);
+		SharedFileSet  *fset = shm_toc_lookup(toc, SHARED_AGG_KEY_FILE_SET, false);
+
+		node->batch_barrier = shm_toc_lookup(toc, SHARED_AGG_KEY_BARRIER, false);
+		node->perhash[0].batch_store =
+			bs_attach_parallel_hash(shm_toc_lookup(toc, 0, false),
+									pwcxt->seg,
+									fset,
+									ParallelWorkerNumber+1);
+
+		i = 1;
+		foreach (lc, agg->chain)
+		{
+			Assert(lfirst_node(Agg, lc)->aggstrategy == AGG_BATCH_HASH);
+			Assert (i<node->num_hashes);
+			node->perhash[i].batch_store =
+				bs_attach_parallel_hash(shm_toc_lookup(toc, i, false),
+										pwcxt->seg,
+										fset,
+										ParallelWorkerNumber+1);
+			++i;
+		}
+	}
 }
 
 /* ----------------------------------------------------------------
@@ -5038,3 +5243,185 @@ ExecAggRetrieveInstrumentation(AggState *node)
 	memcpy(si, node->shared_info, size);
 	node->shared_info = si;
 }
+
+static TupleTableSlot *ExecBatchHashAggPrepare(PlanState *pstate)
+{
+	int				i,x,max_colno_needed;
+	MinimalTuple	mtup;
+	TupleTableSlot *inputslot;
+	PlanState	   *outer = outerPlanState(pstate);
+	AggState	   *node = castNode(AggState, pstate);
+	ExprContext	   *tmpcontext = node->tmpcontext;
+	bool		   *isnull;
+	Bitmapset	   *colnos_needed;
+	Bitmapset	  **colnos_neededs;
+	Assert(node->aggstrategy == AGG_BATCH_HASH);
+	Assert(node->perhash[0].batch_store == NULL ||
+		   node->batch_barrier != NULL);
+
+	if (node->agg_done)
+		return NULL;
+
+	/* create batch store if not parallel */
+	if (node->perhash[0].batch_store == NULL)
+	{
+		MemoryContext	oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(pstate));
+		Agg			   *agg = castNode(Agg, pstate->plan);
+		ListCell	   *lc;
+
+		node->perhash[0].batch_store = bs_begin_hash(agg->numBatches);
+
+		i = 1;
+		foreach (lc, agg->chain)
+		{
+			Agg *subagg = lfirst_node(Agg, lc);
+			Assert(subagg->aggstrategy == AGG_BATCH_HASH);
+			Assert(i < node->num_hashes);
+			node->perhash[i].batch_store = bs_begin_hash(subagg->numBatches);
+			++i;
+		}
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	if (node->batch_barrier &&
+		BarrierAttach(node->batch_barrier) > 0)
+	{
+		BarrierDetach(node->batch_barrier);
+		goto batches_already_done_;
+	}
+
+	/* read for make minimal tuple */
+	isnull = palloc(sizeof(isnull[0]) * node->hash_spill_wslot->tts_tupleDescriptor->natts);
+	memset(isnull, true, sizeof(isnull[0]) * node->hash_spill_wslot->tts_tupleDescriptor->natts);
+	max_colno_needed = node->max_colno_needed;
+
+	/* convert Attribute numbers to index(start with 0) */
+	colnos_neededs = palloc(sizeof(colnos_neededs[0]) * node->num_hashes);
+	for (i=0;i<node->num_hashes;++i)
+	{
+		AggStatePerHash	perhash = &node->perhash[i];
+		colnos_needed = NULL;
+		x = -1;
+		while ((x=bms_next_member(perhash->colnos_needed, x)) >= 0)
+		{
+			Assert(x > 0);
+			colnos_needed = bms_add_member(colnos_needed, x-1);
+		}
+		colnos_neededs[i] = colnos_needed;
+	}
+
+	for (;;)
+	{
+		CHECK_FOR_INTERRUPTS();
+		inputslot = ExecProcNode(outer);
+		if (TupIsNull(inputslot))
+			break;
+
+		tmpcontext->ecxt_outertuple = inputslot;
+		slot_getsomeattrs(inputslot, max_colno_needed);
+
+		for (i=0;i<node->num_hashes;++i)
+		{
+			AggStatePerHash	perhash = &node->perhash[i];
+			TupleTableSlot *hashslot = perhash->hashslot;
+
+			CHECK_FOR_INTERRUPTS();
+
+			/* mark unneeded columns as null */
+			memset(isnull, true, sizeof(isnull[0]) * max_colno_needed);
+			colnos_needed = colnos_neededs[i];
+			x = -1;
+			while ((x = bms_next_member(colnos_needed, x)) >= 0)
+				isnull[x] = inputslot->tts_isnull[x];
+			/* make minimal tuple from we needed columns for this set */
+			mtup = heap_form_minimal_tuple(inputslot->tts_tupleDescriptor,
+										   inputslot->tts_values,
+										   isnull);
+
+			prepare_hash_slot(perhash, inputslot, hashslot);
+
+			bs_write_hash(perhash->batch_store,
+						  mtup,
+						  TupleHashTableHash(perhash->hashtable, hashslot));
+			pfree(mtup);
+			ResetExprContext(tmpcontext);
+		}
+	}
+
+	for (i=0;i<node->num_hashes;++i)
+		bs_end_write(node->perhash[i].batch_store);
+	if (node->batch_barrier)
+	{
+		BarrierArriveAndWait(node->batch_barrier, WAIT_EVENT_BATCH_HASH_BUILD);
+		BarrierDetach(node->batch_barrier);
+	}
+
+	/* clear temp memory */
+	for (i=0;i<node->num_hashes;++i)
+		bms_free(colnos_neededs[i]);
+	pfree(colnos_neededs);
+	pfree(isnull);
+
+batches_already_done_:
+	node->batch_filled = true;
+	node->current_batch = 0;
+	if (ExecBatchHashAggNextBatch(node) == false)
+		return NULL;
+
+	ExecSetExecProcNode(pstate, ExecBatchHashAgg);
+	return ExecBatchHashAgg(pstate);
+}
+
+static TupleTableSlot *ExecBatchHashAgg(PlanState *pstate)
+{
+	AggState	   *node = castNode(AggState, pstate);
+	TupleTableSlot *result;
+
+reloop:
+	result = agg_retrieve_hash_table_in_memory(node);
+	if (unlikely(result == NULL))
+	{
+		if (agg_refill_hash_table(node) == false &&
+			ExecBatchHashAggNextBatch(node) == false)
+		{
+			return NULL;
+		}else
+		{
+			goto reloop;
+		}
+	}
+
+	return result;
+}
+
+static bool
+batchstore_read(void *userdata, TupleTableSlot *slot, uint32 *hashp)
+{
+	MinimalTuple mtup = bs_read_hash(userdata, hashp);
+	if (unlikely(mtup == NULL))
+		return false;
+	ExecStoreMinimalTuple(mtup, slot, false);
+	return true;
+}
+
+static bool ExecBatchHashAggNextBatch(AggState *node)
+{
+	while (bs_next_batch(node->perhash[node->current_batch].batch_store, false) == false)
+	{
+		++node->current_batch;
+		if (node->current_batch >= node->num_hashes)
+		{
+			node->agg_done = true;
+			return false;
+		}
+	}
+
+	agg_refill_hash_table_ex(node,
+							 batchstore_read,
+							 node->perhash[node->current_batch].batch_store,
+							 0,
+							 node->perhash[node->current_batch].aggnode->numGroups,
+							 node->current_batch);
+	return true;
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 958964f1fa..8649e7d610 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -1042,6 +1042,7 @@ _copyAgg(const Agg *from)
 	COPY_BITMAPSET_FIELD(aggParams);
 	COPY_NODE_FIELD(groupingSets);
 	COPY_NODE_FIELD(chain);
+	COPY_SCALAR_FIELD(numBatches);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index a8dd7ef23f..8893bfab29 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -787,6 +787,7 @@ _outAgg(StringInfo str, const Agg *node)
 	WRITE_BITMAPSET_FIELD(aggParams);
 	WRITE_NODE_FIELD(groupingSets);
 	WRITE_NODE_FIELD(chain);
+	WRITE_UINT_FIELD(numBatches);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 2c6eb4362c..03ef6bc5da 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2249,6 +2249,7 @@ _readAgg(void)
 	READ_BITMAPSET_FIELD(aggParams);
 	READ_NODE_FIELD(groupingSets);
 	READ_NODE_FIELD(chain);
+	READ_UINT_FIELD(numBatches);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 32d0dc8ce5..4143b69178 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -141,6 +141,7 @@ bool		enable_parallel_append = true;
 bool		enable_parallel_hash = true;
 bool		enable_partition_pruning = true;
 bool		enable_batch_sort = true;
+bool		enable_batch_hashagg = false;
 
 typedef struct
 {
@@ -2404,7 +2405,7 @@ cost_agg(Path *path, PlannerInfo *root,
 	/* Use all-zero per-aggregate costs if NULL is passed */
 	if (aggcosts == NULL)
 	{
-		Assert(aggstrategy == AGG_HASHED);
+		Assert(aggstrategy == AGG_HASHED || aggstrategy == AGG_BATCH_HASH);
 		MemSet(&dummy_aggcosts, 0, sizeof(AggClauseCosts));
 		aggcosts = &dummy_aggcosts;
 	}
@@ -2463,10 +2464,13 @@ cost_agg(Path *path, PlannerInfo *root,
 	}
 	else
 	{
-		/* must be AGG_HASHED */
+		/* must be AGG_HASHED or AGG_BATCH_HASH */
 		startup_cost = input_total_cost;
-		if (!enable_hashagg)
+		if ((aggstrategy == AGG_HASHED && !enable_hashagg) ||
+			(aggstrategy == AGG_BATCH_HASH && !enable_batch_hashagg))
+		{
 			startup_cost += disable_cost;
+		}
 		startup_cost += aggcosts->transCost.startup;
 		startup_cost += aggcosts->transCost.per_tuple * input_tuples;
 		/* cost of computing hash value */
@@ -2478,6 +2482,15 @@ cost_agg(Path *path, PlannerInfo *root,
 		/* cost of retrieving from hash table */
 		total_cost += cpu_tuple_cost * numGroups;
 		output_tuples = numGroups;
+
+		if (aggstrategy == AGG_BATCH_HASH)
+		{
+			double	nbytes = relation_byte_size(input_tuples, input_width);
+			double	npages = ceil(nbytes / BLCKSZ);
+			double	material_cost = (seq_page_cost * npages);
+			startup_cost += material_cost;
+			total_cost += material_cost;
+		}
 	}
 
 	/*
@@ -2493,7 +2506,9 @@ cost_agg(Path *path, PlannerInfo *root,
 	 * Accrue writes (spilled tuples) to startup_cost and to total_cost;
 	 * accrue reads only to total_cost.
 	 */
-	if (aggstrategy == AGG_HASHED || aggstrategy == AGG_MIXED)
+	if (aggstrategy == AGG_HASHED ||
+		aggstrategy == AGG_BATCH_HASH ||
+		aggstrategy == AGG_MIXED)
 	{
 		double		pages;
 		double		pages_written = 0.0;
@@ -2506,6 +2521,14 @@ cost_agg(Path *path, PlannerInfo *root,
 		int			num_partitions;
 		int			depth;
 
+		if (aggstrategy == AGG_BATCH_HASH &&
+			numGroups > BATCH_STORE_MAX_BATCH)
+		{
+			numGroups /= BATCH_STORE_MAX_BATCH;
+			if (numGroups < 1.0)
+				numGroups = 1.0;
+		}
+
 		/*
 		 * Estimate number of batches based on the computed limits. If less
 		 * than or equal to one, all groups are expected to fit in memory;
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 85969388c2..a87dd633dc 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -30,6 +30,7 @@
 #include "optimizer/cost.h"
 #include "optimizer/optimizer.h"
 #include "optimizer/paramassign.h"
+#include "optimizer/pathnode.h"
 #include "optimizer/paths.h"
 #include "optimizer/placeholder.h"
 #include "optimizer/plancat.h"
@@ -2327,7 +2328,9 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path)
 			if (!rollup->is_hashed)
 				is_first_sort = false;
 
-			if (rollup->is_hashed)
+			if (best_path->aggstrategy == AGG_BATCH_HASH)
+				strat = AGG_BATCH_HASH;
+			else if (rollup->is_hashed)
 				strat = AGG_HASHED;
 			else if (list_length(linitial(rollup->gsets)) == 0)
 				strat = AGG_PLAIN;
@@ -6417,6 +6420,13 @@ make_agg(List *tlist, List *qual,
 	plan->lefttree = lefttree;
 	plan->righttree = NULL;
 
+	if (aggstrategy == AGG_BATCH_HASH)
+	{
+		node->numBatches = (int32)numGroups;
+		if (node->numBatches > BATCH_STORE_MAX_BATCH)
+			node->numBatches = BATCH_STORE_MAX_BATCH;
+	}
+
 	return node;
 }
 
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 27680dbeb3..5d854b26b5 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -176,6 +176,12 @@ static void consider_groupingsets_paths(PlannerInfo *root,
 										grouping_sets_data *gd,
 										const AggClauseCosts *agg_costs,
 										double dNumGroups);
+static void consider_parallel_hash_groupingsets_paths(PlannerInfo *root,
+													  RelOptInfo *grouped_rel,
+													  Path *path,
+													  grouping_sets_data *gd,
+													  const AggClauseCosts *agg_costs,
+													  double dNumGroups);
 static RelOptInfo *create_window_paths(PlannerInfo *root,
 									   RelOptInfo *input_rel,
 									   PathTarget *input_target,
@@ -4538,6 +4544,77 @@ consider_groupingsets_paths(PlannerInfo *root,
 										  dNumGroups));
 }
 
+static void
+consider_parallel_hash_groupingsets_paths(PlannerInfo *root,
+										  RelOptInfo *grouped_rel,
+										  Path *path,
+										  grouping_sets_data *gd,
+										  const AggClauseCosts *agg_costs,
+										  double dNumGroups)
+{
+	int			hash_mem = get_hash_mem();
+	List	   *new_rollups = NIL;
+	List	   *sets_data;
+	ListCell   *lc;
+	RollupData *rollup;
+	GroupingSetData *gs;
+	double		hashsize;
+	double		numGroups;
+
+	sets_data = list_copy(gd->unsortable_sets);
+	foreach (lc, gd->rollups)
+	{
+		rollup = lfirst_node(RollupData, lc);
+		if (rollup->hashable == false)
+		{
+			list_free(sets_data);
+			return;
+		}
+		sets_data = list_concat(sets_data, rollup->gsets_data);
+	}
+	foreach (lc, sets_data)
+	{
+		gs = lfirst_node(GroupingSetData, lc);
+		numGroups = gs->numGroups / BATCH_STORE_MAX_BATCH;
+		if (numGroups < 1.0)
+			numGroups = 1.0;
+		hashsize = estimate_hashagg_tablesize(path,
+											  agg_costs,
+											  numGroups);
+		if (hashsize > hash_mem * 1024L)
+		{
+			list_free(sets_data);
+			list_free_deep(new_rollups);
+			return;
+		}
+
+		rollup = makeNode(RollupData);
+		rollup->groupClause = preprocess_groupclause(root, gs->set);
+		rollup->gsets_data = list_make1(gs);
+		rollup->gsets = remap_to_groupclause_idx(rollup->groupClause,
+												 rollup->gsets_data,
+												 gd->tleref_to_colnum_map);
+		rollup->numGroups = gs->numGroups;
+		rollup->hashable = true;
+		rollup->is_hashed = true;
+		new_rollups = lappend(new_rollups, rollup);
+	}
+
+	numGroups = dNumGroups / path->parallel_workers;
+	if (numGroups < list_length(new_rollups))
+		numGroups = list_length(new_rollups);
+	path = (Path*)create_groupingsets_path(root,
+										   grouped_rel,
+										   path,
+										   (List*) root->parse->havingQual,
+										   AGG_BATCH_HASH,
+										   new_rollups,
+										   agg_costs,
+										   numGroups);
+	path->parallel_aware = true;
+	add_partial_path(grouped_rel, path);
+}
+
 /*
  * create_window_paths
  *
@@ -4952,6 +5029,30 @@ create_distinct_paths(PlannerInfo *root,
 								 NIL,
 								 NULL,
 								 numDistinctRows));
+#if 1
+		/* Generate parallel batch hashed aggregate path */
+		if (distinct_rel->consider_parallel &&
+			input_rel->partial_pathlist != NIL &&
+			numDistinctRows > 1.0)
+		{
+			Path *path = linitial(input_rel->partial_pathlist);
+			double numRows = numDistinctRows / path->parallel_workers;
+			if (numRows < 1.0)
+				numRows = 1.0;
+			path = (Path *)create_agg_path(root,
+										   distinct_rel,
+										   path,
+										   path->pathtarget,
+										   AGG_BATCH_HASH,
+										   AGGSPLIT_SIMPLE,
+										   parse->distinctClause,
+										   NIL,
+										   NULL,
+										   numRows);
+			path->parallel_aware = true;
+			add_partial_path(distinct_rel, path);
+		}
+#endif
 	}
 
 	generate_useful_gather_paths(root, distinct_rel, false);
@@ -6874,6 +6975,14 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
 			consider_groupingsets_paths(root, grouped_rel,
 										cheapest_path, false, true,
 										gd, agg_costs, dNumGroups);
+			if (grouped_rel->consider_parallel &&
+				input_rel->partial_pathlist != NIL)
+				consider_parallel_hash_groupingsets_paths(root,
+														  grouped_rel,
+														  linitial(input_rel->partial_pathlist),
+														  gd,
+														  agg_costs,
+														  dNumGroups);
 		}
 		else
 		{
@@ -6891,6 +7000,28 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
 									 havingQual,
 									 agg_costs,
 									 dNumGroups));
+
+			if (grouped_rel->consider_parallel &&
+				input_rel->partial_pathlist != NIL &&
+				dNumGroups >= 2.0)
+			{
+				Path   *path = linitial(input_rel->partial_pathlist);
+				double	numGroups = dNumGroups / path->parallel_workers;
+				if (numGroups < 1.0)
+					numGroups = 1.0;
+				path = (Path*)create_agg_path(root,
+											  grouped_rel,
+											  path,
+											  grouped_rel->reltarget,
+											  AGG_BATCH_HASH,
+											  AGGSPLIT_SIMPLE,
+											  parse->groupClause,
+											  havingQual,
+											  agg_costs,
+											  numGroups);
+				path->parallel_aware = true;
+				add_partial_path(grouped_rel, path);
+			}
 		}
 
 		/*
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index fa1053f077..efe438524c 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -714,6 +714,33 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 															partial_path->rows);
 			add_partial_path(result_rel, partial_path);
 		}
+
+		/* create parallel batch hashed union */
+		if (!op->all &&
+			ppath->rows > 1.0 &&
+			grouping_is_hashable(groupList))
+		{
+			Path   *partial_path;
+			double	dNumGroups = ppath->rows / ppath->parallel_workers;
+			if (dNumGroups < 1.0)
+				dNumGroups = 1.0;
+			if (ppath->pathtarget->width * dNumGroups <= get_hash_mem() * 1024L)
+			{
+				partial_path = (Path*)create_agg_path(root,
+													  result_rel,
+													  ppath,
+													  create_pathtarget(root, tlist),
+													  AGG_BATCH_HASH,
+													  AGGSPLIT_SIMPLE,
+													  groupList,
+													  NIL,
+													  NULL,
+													  dNumGroups);
+				partial_path->parallel_aware = true;
+				add_partial_path(result_rel, partial_path);
+			}
+		}
+
 		ppath = (Path *)
 			create_gather_path(root, result_rel, ppath,
 							   result_rel->reltarget, NULL, NULL);
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index cacb7d13e6..04113bae7d 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4024,6 +4024,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_BATCH_SORT_BUILD:
 			event_name = "Batch/Sort/Building";
 			break;
+		case WAIT_EVENT_BATCH_HASH_BUILD:
+			event_name = "Batch/Hash/Building";
+			break;
 			/* no default case, so that compiler will warn */
 	}
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 43a4e36d78..ed2a4369eb 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -996,6 +996,15 @@ static struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_batch_hashagg", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("enable batch hash agg method"),
+			NULL
+		},
+		&enable_batch_hashagg,
+		false,
+		NULL, NULL, NULL
+	},
 	{
 		{"enable_incremental_sort", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of incremental sort steps."),
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index b955169538..c6968c8301 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -310,6 +310,8 @@ typedef struct AggStatePerHashData
 	int			largestGrpColIdx;	/* largest col required for hashing */
 	AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */
 	AttrNumber *hashGrpColIdxHash;	/* indices in hash table tuples */
+	Bitmapset  *colnos_needed;	/* all columns needed from the outer plan */
+	struct BatchStoreData *batch_store;	/* grouping set batch store hash */
 	Agg		   *aggnode;		/* original Agg node, for numGroups etc. */
 }			AggStatePerHashData;
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 14dde9fca3..ac53d0723e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2218,6 +2218,9 @@ typedef struct AggState
 										 * ->hash_pergroup */
 	ProjectionInfo *combinedproj;	/* projection machinery */
 	SharedAggInfo *shared_info; /* one entry per worker */
+	struct Barrier *batch_barrier;		/* for parallel batch */
+	int			current_batch;
+	bool		batch_filled;
 } AggState;
 
 /* ----------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index ace4c98939..1b3365c241 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -762,7 +762,8 @@ typedef enum AggStrategy
 	AGG_PLAIN,					/* simple agg across all input rows */
 	AGG_SORTED,					/* grouped agg, input must be sorted */
 	AGG_HASHED,					/* grouped agg, use internal hashtable */
-	AGG_MIXED					/* grouped agg, hash and sort both used */
+	AGG_MIXED,					/* grouped agg, hash and sort both used */
+	AGG_BATCH_HASH				/* grouped agg, use batch hash */
 } AggStrategy;
 
 /*
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index f7ad7881dc..941eb3a23b 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -840,6 +840,7 @@ typedef struct Agg
 	/* Note: planner provides numGroups & aggParams only in HASHED/MIXED case */
 	List	   *groupingSets;	/* grouping sets to use */
 	List	   *chain;			/* chained Agg/Sort nodes */
+	uint32		numBatches;	/* valid in HASHED */
 } Agg;
 
 /* ----------------
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 37e6a12a6f..dc0cd825f0 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -54,6 +54,7 @@ extern PGDLLIMPORT bool enable_bitmapscan;
 extern PGDLLIMPORT bool enable_tidscan;
 extern PGDLLIMPORT bool enable_sort;
 extern PGDLLIMPORT bool enable_batch_sort;
+extern PGDLLIMPORT bool enable_batch_hashagg;
 extern PGDLLIMPORT bool enable_incremental_sort;
 extern PGDLLIMPORT bool enable_hashagg;
 extern PGDLLIMPORT bool enable_nestloop;
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 816fc37739..0e8eb26111 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -19,6 +19,8 @@
 
 #define BATCH_SORT_MIN_BATCHES		2
 #define BATCH_SORT_MAX_BATCHES		512
+#define BATCH_STORE_MIN_BATCH		2
+#define BATCH_STORE_MAX_BATCH		1024
 
 /*
  * prototypes for pathnode.c
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f0b6dae97b..2826a0b38c 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -953,7 +953,8 @@ typedef enum
 	WAIT_EVENT_SAFE_SNAPSHOT,
 	WAIT_EVENT_SYNC_REP,
 	WAIT_EVENT_XACT_GROUP_UPDATE,
-	WAIT_EVENT_BATCH_SORT_BUILD
+	WAIT_EVENT_BATCH_SORT_BUILD,
+	WAIT_EVENT_BATCH_HASH_BUILD
 } WaitEventIPC;
 
 /* ----------
diff --git a/src/test/regress/expected/groupingsets.out b/src/test/regress/expected/groupingsets.out
index 701d52b465..bcac70894f 100644
--- a/src/test/regress/expected/groupingsets.out
+++ b/src/test/regress/expected/groupingsets.out
@@ -1739,4 +1739,69 @@ set work_mem to default;
 
 drop table gs_group_1;
 drop table gs_hash_1;
+-- parallel grouping sets
+BEGIN;
+set enable_batch_hashagg = on;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 10;
+explain (costs off)
+select sum(unique1),count(unique1),two,four,ten,twenty from tenk1 group by grouping sets(two,four,ten,(two,twenty),()) order by 3,4,5,6;
+                  QUERY PLAN                  
+----------------------------------------------
+ Gather Merge
+   Workers Planned: 2
+   ->  Sort
+         Sort Key: two, four, ten, twenty
+         ->  Parallel BatchHashAggregate
+               Group Key: two, twenty
+               Group Key: two
+               Group Key: ()
+               Group Key: four
+               Group Key: ten
+               ->  Parallel Seq Scan on tenk1
+(11 rows)
+
+select sum(unique1),count(unique1),two,four,ten,twenty from tenk1 group by grouping sets(two,four,ten,(two,twenty),()) order by 3,4,5,6;
+   sum    | count | two | four | ten | twenty 
+----------+-------+-----+------+-----+--------
+  2495000 |   500 |   0 |      |     |      0
+  2496000 |   500 |   0 |      |     |      2
+  2497000 |   500 |   0 |      |     |      4
+  2498000 |   500 |   0 |      |     |      6
+  2499000 |   500 |   0 |      |     |      8
+  2500000 |   500 |   0 |      |     |     10
+  2501000 |   500 |   0 |      |     |     12
+  2502000 |   500 |   0 |      |     |     14
+  2503000 |   500 |   0 |      |     |     16
+  2504000 |   500 |   0 |      |     |     18
+ 24995000 |  5000 |   0 |      |     |       
+  2495500 |   500 |   1 |      |     |      1
+  2496500 |   500 |   1 |      |     |      3
+  2497500 |   500 |   1 |      |     |      5
+  2498500 |   500 |   1 |      |     |      7
+  2499500 |   500 |   1 |      |     |      9
+  2500500 |   500 |   1 |      |     |     11
+  2501500 |   500 |   1 |      |     |     13
+  2502500 |   500 |   1 |      |     |     15
+  2503500 |   500 |   1 |      |     |     17
+  2504500 |   500 |   1 |      |     |     19
+ 25000000 |  5000 |   1 |      |     |       
+ 12495000 |  2500 |     |    0 |     |       
+ 12497500 |  2500 |     |    1 |     |       
+ 12500000 |  2500 |     |    2 |     |       
+ 12502500 |  2500 |     |    3 |     |       
+  4995000 |  1000 |     |      |   0 |       
+  4996000 |  1000 |     |      |   1 |       
+  4997000 |  1000 |     |      |   2 |       
+  4998000 |  1000 |     |      |   3 |       
+  4999000 |  1000 |     |      |   4 |       
+  5000000 |  1000 |     |      |   5 |       
+  5001000 |  1000 |     |      |   6 |       
+  5002000 |  1000 |     |      |   7 |       
+  5003000 |  1000 |     |      |   8 |       
+  5004000 |  1000 |     |      |   9 |       
+ 49995000 | 10000 |     |      |     |       
+(37 rows)
+
+ABORT;
 -- end
diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out
index b187c1080b..ba327d7583 100644
--- a/src/test/regress/expected/partition_aggregate.out
+++ b/src/test/regress/expected/partition_aggregate.out
@@ -1523,6 +1523,7 @@ SET min_parallel_table_scan_size = 0;
 SET parallel_tuple_cost = 0;
 SET parallel_setup_cost = 0;
 SET enable_indexonlyscan = OFF;
+-- using batch sort
 EXPLAIN (COSTS OFF)
 SELECT unique2,count(*) FROM tenk1 GROUP BY 1;
                   QUERY PLAN                  
@@ -1588,4 +1589,67 @@ SELECT count(*) FROM
  10000
 (1 row)
 
+-- using batch hash
+SET enable_batch_sort = OFF;
+SET enable_batch_hashagg = ON;
+EXPLAIN (COSTS OFF)
+SELECT unique2,count(*) FROM tenk1 GROUP BY 1;
+               QUERY PLAN               
+----------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Parallel BatchHashAggregate
+         Group Key: unique2
+         ->  Parallel Seq Scan on tenk1
+(5 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo;
+                  QUERY PLAN                  
+----------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Parallel BatchHashAggregate
+               Group Key: tenk1.unique2
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo;
+ count 
+-------
+ 10000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM
+  (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1
+    INNER JOIN
+  (SELECT unique2  id FROM tenk1) t2
+  USING(id);
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: ((count(*)) = tenk1.unique2)
+                     ->  Parallel BatchHashAggregate
+                           Group Key: tenk1_1.unique2
+                           ->  Parallel Seq Scan on tenk1 tenk1_1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on tenk1
+(11 rows)
+
+SELECT count(*) FROM
+  (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1
+    INNER JOIN
+  (SELECT unique2  id FROM tenk1) t2
+  USING(id);
+ count 
+-------
+ 10000
+(1 row)
+
 ABORT;
diff --git a/src/test/regress/expected/select_distinct.out b/src/test/regress/expected/select_distinct.out
index c200e38d12..8c7f3381be 100644
--- a/src/test/regress/expected/select_distinct.out
+++ b/src/test/regress/expected/select_distinct.out
@@ -313,6 +313,7 @@ SET min_parallel_table_scan_size =0;
 SET parallel_tuple_cost = 0;
 SET parallel_setup_cost = 0;
 SET enable_indexonlyscan = OFF;
+-- using batch sort
 EXPLAIN (costs off)
 SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
                      QUERY PLAN                     
@@ -347,4 +348,38 @@ SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
                            ->  Parallel Seq Scan on tenk1
 (9 rows)
 
+-- using batch hash
+SET enable_batch_sort = OFF;
+SET enable_batch_hashagg = ON;
+EXPLAIN (costs off)
+SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+                  QUERY PLAN                  
+----------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Parallel BatchHashAggregate
+               Group Key: tenk1.unique2
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+ count 
+-------
+ 10000
+(1 row)
+
+explain (costs off)
+SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+                  QUERY PLAN                  
+----------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Parallel BatchHashAggregate
+         Group Key: tenk1.unique2
+         ->  Parallel BatchHashAggregate
+               Group Key: tenk1.unique2
+               ->  Parallel Seq Scan on tenk1
+(7 rows)
+
 ABORT;
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 8ed047e520..a7219644a8 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -88,6 +88,7 @@ select count(*) = 1 as ok from pg_stat_wal;
 select name, setting from pg_settings where name like 'enable%';
               name              | setting 
 --------------------------------+---------
+ enable_batch_hashagg           | off
  enable_batch_sort              | off
  enable_bitmapscan              | on
  enable_gathermerge             | on
@@ -107,7 +108,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(19 rows)
+(20 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
diff --git a/src/test/regress/expected/union.out b/src/test/regress/expected/union.out
index 5a2be9aec9..519a50bb20 100644
--- a/src/test/regress/expected/union.out
+++ b/src/test/regress/expected/union.out
@@ -1059,6 +1059,7 @@ SET min_parallel_table_scan_size =0;
 SET parallel_tuple_cost = 0;
 SET parallel_setup_cost = 0;
 SET enable_indexonlyscan = OFF;
+-- using batch sort
 EXPLAIN (costs off)
 SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo;
                             QUERY PLAN                            
@@ -1106,4 +1107,33 @@ SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1)
  10000
 (1 row)
 
+-- using batch hash
+SET enable_batch_sort = OFF;
+SET enable_batch_hashagg = ON;
+EXPLAIN (costs off)
+SELECT count(*) from (
+  SELECT hundred from tenk1
+    union
+  SELECT hundred from tenk1) foo;
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Parallel BatchHashAggregate
+               Group Key: tenk1.hundred
+               ->  Parallel Append
+                     ->  Parallel Seq Scan on tenk1
+                     ->  Parallel Seq Scan on tenk1 tenk1_1
+(8 rows)
+
+SELECT count(*) from (
+  SELECT hundred from tenk1
+    union
+  SELECT hundred from tenk1) foo;
+ count 
+-------
+   100
+(1 row)
+
 ABORT;
diff --git a/src/test/regress/sql/groupingsets.sql b/src/test/regress/sql/groupingsets.sql
index d4e5628eba..2eff77af47 100644
--- a/src/test/regress/sql/groupingsets.sql
+++ b/src/test/regress/sql/groupingsets.sql
@@ -511,4 +511,14 @@ set work_mem to default;
 drop table gs_group_1;
 drop table gs_hash_1;
 
+-- parallel grouping sets
+BEGIN;
+set enable_batch_hashagg = on;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 10;
+explain (costs off)
+select sum(unique1),count(unique1),two,four,ten,twenty from tenk1 group by grouping sets(two,four,ten,(two,twenty),()) order by 3,4,5,6;
+select sum(unique1),count(unique1),two,four,ten,twenty from tenk1 group by grouping sets(two,four,ten,(two,twenty),()) order by 3,4,5,6;
+ABORT;
+
 -- end
diff --git a/src/test/regress/sql/partition_aggregate.sql b/src/test/regress/sql/partition_aggregate.sql
index 3e50a48d37..7ecbe3ed56 100644
--- a/src/test/regress/sql/partition_aggregate.sql
+++ b/src/test/regress/sql/partition_aggregate.sql
@@ -338,6 +338,26 @@ SET min_parallel_table_scan_size = 0;
 SET parallel_tuple_cost = 0;
 SET parallel_setup_cost = 0;
 SET enable_indexonlyscan = OFF;
+-- using batch sort
+EXPLAIN (COSTS OFF)
+SELECT unique2,count(*) FROM tenk1 GROUP BY 1;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo;
+SELECT count(*) FROM (SELECT unique2,count(*) FROM tenk1 GROUP BY 1) foo;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM
+  (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1
+    INNER JOIN
+  (SELECT unique2  id FROM tenk1) t2
+  USING(id);
+SELECT count(*) FROM
+  (SELECT unique2,count(*) id FROM tenk1 GROUP BY 1) t1
+    INNER JOIN
+  (SELECT unique2  id FROM tenk1) t2
+  USING(id);
+-- using batch hash
+SET enable_batch_sort = OFF;
+SET enable_batch_hashagg = ON;
 EXPLAIN (COSTS OFF)
 SELECT unique2,count(*) FROM tenk1 GROUP BY 1;
 EXPLAIN (COSTS OFF)
diff --git a/src/test/regress/sql/select_distinct.sql b/src/test/regress/sql/select_distinct.sql
index 3ff7acf64d..2a16d9b23d 100644
--- a/src/test/regress/sql/select_distinct.sql
+++ b/src/test/regress/sql/select_distinct.sql
@@ -143,6 +143,15 @@ SET min_parallel_table_scan_size =0;
 SET parallel_tuple_cost = 0;
 SET parallel_setup_cost = 0;
 SET enable_indexonlyscan = OFF;
+-- using batch sort
+EXPLAIN (costs off)
+SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+explain (costs off)
+SELECT DISTINCT * FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
+-- using batch hash
+SET enable_batch_sort = OFF;
+SET enable_batch_hashagg = ON;
 EXPLAIN (costs off)
 SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
 SELECT COUNT(*) FROM (SELECT DISTINCT unique2 FROM tenk1) foo;
diff --git a/src/test/regress/sql/union.sql b/src/test/regress/sql/union.sql
index a1cb1bb7ac..9fd50db549 100644
--- a/src/test/regress/sql/union.sql
+++ b/src/test/regress/sql/union.sql
@@ -448,10 +448,23 @@ SET min_parallel_table_scan_size =0;
 SET parallel_tuple_cost = 0;
 SET parallel_setup_cost = 0;
 SET enable_indexonlyscan = OFF;
+-- using batch sort
 EXPLAIN (costs off)
 SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo;
 SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) foo;
 EXPLAIN (costs off)
 SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2);
 SELECT count(*) FROM (SELECT unique2 FROM tenk1 UNION SELECT unique2 FROM tenk1) t1 INNER JOIN tenk1 USING(unique2);
+-- using batch hash
+SET enable_batch_sort = OFF;
+SET enable_batch_hashagg = ON;
+EXPLAIN (costs off)
+SELECT count(*) from (
+  SELECT hundred from tenk1
+    union
+  SELECT hundred from tenk1) foo;
+SELECT count(*) from (
+  SELECT hundred from tenk1
+    union
+  SELECT hundred from tenk1) foo;
 ABORT;
\ No newline at end of file
-- 
2.16.3

