Hello, the parallel scan became to work. So I'd like to repropose
the 'asynchronous execution' or 'early execution'.

In previous proposal, I had only foreign scan as workable
example, but now I can use the parallel execution instead to make
this distinctive from parallel execution itself.

I could put more work on this before proposal but I'd like to
show this at this time in order to judge wheter this deserves
further work.


==== Overview of asynchronos execution

"Asynchronous execution" is a feature to start substantial work
of nodes before doing Exec*. This can reduce total startup time
by folding startup time of multiple execution nodes. Especially
effective for the combination of joins or appends and their
multiple children that needs long time to startup.

This patch does that by inserting another phase "Start*" between
ExecInit* and Exec* to launch parallel processing including
pgworker and FDWs before requesting the very first tuple of the
result.

==== About this patch

As a proof of concept, the first tree patchs adds such start
phase to executor and add facility to trace node status for
almost all kind of the executor nodes (Part of this would be
useless, though). Then the two last implement an example usage of
the infrastracture.

The two introduced GUCs enable_parasortmerge and
enable_asyncexec respecively controls whether to use gather for
sorts under merge join and whether to make asyncronous execution
effective.

For evaluation, I made merge join to use bgworker for some
codition as an example. It is mere a mock implement but enough to
show the difference between parallel execution and async
execution (More appropriate names are welcome) and its
effectiveness. Thanks for Amit's great work.

==== Performance test

Apply all the patches then do the following in order. Of course
this test is artificially made so that this patch wins:)

CREATE TABLE t1 (a int, b int);
CREATE TABLE t2 (a int, b int);
CREATE TABLE t3 (a int, b int);
INSERT INTO t1 (SELECT (a / 1000) + (a % 1000) * 1000, a FROM 
generate_series(0, 999999) a);
INSERT INTO t2 (SELECT (a / 1000) + (a % 1000) * 1000, a FROM 
generate_series(0, 999999) a);
INSERT INTO t3 (SELECT (a / 1000) + (a % 1000) * 1000, a FROM 
generate_series(0, 999999) a);
ANALYZE t1;
ANALYZE t2;
ANALYZE t3;
SET enable_nestloop TO true;
SET enable_hashjoin TO true;
SET enable_material TO true;
SET enable_parasortmerge TO false;
SET enable_asyncexec TO false;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 
on (t1.a = t3.a) ORDER BY t1.a LIMIT 10;
SET enable_nestloop TO false;
SET enable_hashjoin TO false;
SET enable_material TO false;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 
on (t1.a = t3.a) ORDER BY t1.a LIMIT 10;
SET enable_parasortmerge TO true;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 
on (t1.a = t3.a) ORDER BY t1.a LIMIT 10;
SET enable_asyncexec TO true;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 
on (t1.a = t3.a) ORDER BY t1.a LIMIT 10;

==== Test results

On my environment, the following results were given.

- The first attempt, planner chooses hash join plan and it takes about 3.3s.

- The second, Merge Joins are done in single backend, takes about 5.1s.

- The third, simply use parallel execution of MJ, takes about 5.8s

- The fourth, start execution asynchronously of MJ, takes about 3.0s.

So asynchronous exeuction at least accelerates parallel execution
for this case, even faster than the current fastest (maybe) plan.


====== TODO or random thoughts, not restricted on this patch.

- This patch doesn't contain planner part, it must be aware of
  async execution in order that this can be  in effective.

- Some measture to control execution on bgworker would be
  needed. At least merge join requires position mark/reset
  functions.

- Currently, more tuples make reduce effectiveness of parallel
  execution, some method to transfer tuples in larger unit would
  be needed, or would be good to have shared workmem?

- The term "asynchronous execution" looks a little confusing with
  paralle execution. Early execution/start might be usable but
  I'm not so confident.


Any suggestions? thoughts?

I must apologize for the incomplete proposal and cluttered thoughts.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 415d4d0784e45c066b727a0a18716dd449aea044 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 8 Jul 2015 11:48:12 +0900
Subject: [PATCH 1/5] Add infrastructure for executor node run state.

This infrastructure expands the node state from what ResultNode did to
general form having four states.

The states are Inited, Started, Running and Done. Running and Done are
the same as what rs_done of ResultNode indicated. Inited state
indiates that the node has been initialized but not executed. Started
state indicates that the node has been executed but the first tuple
have not received yet. Running indicates that the node is returning
tuples and Done indicates that the node has no more tuple to return.

The nodes Group, ModifyTable, SetOp and WindowAgg had their own
run-state management so they are moved to this infrastructure by this
patch.
---
 src/backend/commands/explain.c             |  2 +-
 src/backend/executor/nodeAgg.c             |  1 +
 src/backend/executor/nodeAppend.c          |  1 +
 src/backend/executor/nodeBitmapAnd.c       |  1 +
 src/backend/executor/nodeBitmapHeapscan.c  |  1 +
 src/backend/executor/nodeBitmapIndexscan.c |  1 +
 src/backend/executor/nodeBitmapOr.c        |  1 +
 src/backend/executor/nodeCtescan.c         |  1 +
 src/backend/executor/nodeCustom.c          |  1 +
 src/backend/executor/nodeForeignscan.c     |  1 +
 src/backend/executor/nodeFunctionscan.c    |  1 +
 src/backend/executor/nodeGather.c          | 10 ++++++----
 src/backend/executor/nodeGroup.c           | 14 +++++++++-----
 src/backend/executor/nodeHash.c            |  1 +
 src/backend/executor/nodeHashjoin.c        |  1 +
 src/backend/executor/nodeIndexonlyscan.c   |  1 +
 src/backend/executor/nodeIndexscan.c       |  1 +
 src/backend/executor/nodeLimit.c           |  1 +
 src/backend/executor/nodeLockRows.c        |  1 +
 src/backend/executor/nodeMaterial.c        |  1 +
 src/backend/executor/nodeMergeAppend.c     |  1 +
 src/backend/executor/nodeMergejoin.c       |  1 +
 src/backend/executor/nodeModifyTable.c     |  9 ++++++---
 src/backend/executor/nodeNestloop.c        |  1 +
 src/backend/executor/nodeRecursiveunion.c  |  1 +
 src/backend/executor/nodeResult.c          |  1 +
 src/backend/executor/nodeSamplescan.c      |  1 +
 src/backend/executor/nodeSeqscan.c         |  1 +
 src/backend/executor/nodeSetOp.c           | 20 ++++++++++++--------
 src/backend/executor/nodeSort.c            |  1 +
 src/backend/executor/nodeSubqueryscan.c    |  1 +
 src/backend/executor/nodeTidscan.c         |  1 +
 src/backend/executor/nodeUnique.c          |  1 +
 src/backend/executor/nodeValuesscan.c      |  1 +
 src/backend/executor/nodeWindowAgg.c       | 12 ++++++++----
 src/backend/executor/nodeWorktablescan.c   |  1 +
 src/include/nodes/execnodes.h              | 27 ++++++++++++++++++++++-----
 37 files changed, 94 insertions(+), 30 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 183d3d9..fb07213 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2107,7 +2107,7 @@ static void
 show_sort_info(SortState *sortstate, ExplainState *es)
 {
 	Assert(IsA(sortstate, SortState));
-	if (es->analyze && sortstate->sort_Done &&
+	if (es->analyze && ExecNode_is_done(sortstate) &&
 		sortstate->tuplesortstate != NULL)
 	{
 		Tuplesortstate *state = (Tuplesortstate *) sortstate->tuplesortstate;
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 2e36855..2ef3bdf 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -2039,6 +2039,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	aggstate = makeNode(AggState);
 	aggstate->ss.ps.plan = (Plan *) node;
 	aggstate->ss.ps.state = estate;
+	SetNodeRunState(aggstate, Inited);
 
 	aggstate->aggs = NIL;
 	aggstate->numaggs = 0;
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 2cffef8..4718c0f 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -140,6 +140,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	 */
 	appendstate->ps.plan = (Plan *) node;
 	appendstate->ps.state = estate;
+	SetNodeRunState(appendstate, Inited);
 	appendstate->appendplans = appendplanstates;
 	appendstate->as_nplans = nplans;
 
diff --git a/src/backend/executor/nodeBitmapAnd.c b/src/backend/executor/nodeBitmapAnd.c
index 205980e..8bc5bbe 100644
--- a/src/backend/executor/nodeBitmapAnd.c
+++ b/src/backend/executor/nodeBitmapAnd.c
@@ -63,6 +63,7 @@ ExecInitBitmapAnd(BitmapAnd *node, EState *estate, int eflags)
 	 */
 	bitmapandstate->ps.plan = (Plan *) node;
 	bitmapandstate->ps.state = estate;
+	SetNodeRunState(bitmapandstate, Inited);
 	bitmapandstate->bitmapplans = bitmapplanstates;
 	bitmapandstate->nplans = nplans;
 
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index c784b9e..04ce35a 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -556,6 +556,7 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
 	scanstate = makeNode(BitmapHeapScanState);
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
+	SetNodeRunState(scanstate, Inited);
 
 	scanstate->tbm = NULL;
 	scanstate->tbmiterator = NULL;
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index 77fc1e5..613054f 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -206,6 +206,7 @@ ExecInitBitmapIndexScan(BitmapIndexScan *node, EState *estate, int eflags)
 	indexstate = makeNode(BitmapIndexScanState);
 	indexstate->ss.ps.plan = (Plan *) node;
 	indexstate->ss.ps.state = estate;
+	SetNodeRunState(indexstate, Inited);
 
 	/* normally we don't make the result bitmap till runtime */
 	indexstate->biss_result = NULL;
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index 353a5b6..fcdaeaf 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -64,6 +64,7 @@ ExecInitBitmapOr(BitmapOr *node, EState *estate, int eflags)
 	 */
 	bitmaporstate->ps.plan = (Plan *) node;
 	bitmaporstate->ps.state = estate;
+	SetNodeRunState(bitmaporstate, Inited);
 	bitmaporstate->bitmapplans = bitmapplanstates;
 	bitmaporstate->nplans = nplans;
 
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index 75c1ab3..666ef91 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -191,6 +191,7 @@ ExecInitCteScan(CteScan *node, EState *estate, int eflags)
 	scanstate = makeNode(CteScanState);
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
+	SetNodeRunState(scanstate, Inited);
 	scanstate->eflags = eflags;
 	scanstate->cte_table = NULL;
 	scanstate->eof_cte = false;
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 0a022df..e7e3b17 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -43,6 +43,7 @@ ExecInitCustomScan(CustomScan *cscan, EState *estate, int eflags)
 	/* fill up fields of ScanState */
 	css->ss.ps.plan = &cscan->scan.plan;
 	css->ss.ps.state = estate;
+	SetNodeRunState(css, Inited);
 
 	/* create expression context for node */
 	ExecAssignExprContext(estate, &css->ss.ps);
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 6165e4a..90483e4 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -128,6 +128,7 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
 	scanstate = makeNode(ForeignScanState);
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
+	SetNodeRunState(scanstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c
index f5fa2b3..849b54f 100644
--- a/src/backend/executor/nodeFunctionscan.c
+++ b/src/backend/executor/nodeFunctionscan.c
@@ -299,6 +299,7 @@ ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags)
 	scanstate = makeNode(FunctionScanState);
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
+	SetNodeRunState(scanstate, Inited);
 	scanstate->eflags = eflags;
 
 	/*
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index f8c1ba6..0e71dfc 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -67,6 +67,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate = makeNode(GatherState);
 	gatherstate->ps.plan = (Plan *) node;
 	gatherstate->ps.state = estate;
+	SetNodeRunState(gatherstate, Inited);
 	gatherstate->need_to_scan_locally = !node->single_copy;
 
 	/*
@@ -140,7 +141,7 @@ ExecGather(GatherState *node)
 	 * needs to allocate large dynamic segement, so it is better to do if it
 	 * is really needed.
 	 */
-	if (!node->initialized)
+	if (!ExecNode_is_running(node))
 	{
 		EState	   *estate = node->ps.state;
 		Gather	   *gather = (Gather *) node->ps.plan;
@@ -196,7 +197,8 @@ ExecGather(GatherState *node)
 		/* Run plan locally if no workers or not single-copy. */
 		node->need_to_scan_locally = (node->reader == NULL)
 			|| !gather->single_copy;
-		node->initialized = true;
+
+		SetNodeRunState(node, Running);
 	}
 
 	/*
@@ -455,10 +457,10 @@ ExecReScanGather(GatherState *node)
 	 */
 	ExecShutdownGatherWorkers(node);
 
-	node->initialized = false;
-
 	if (node->pei)
 		ExecParallelReinitialize(node->pei);
 
+	SetNodeRunState(node, Inited);
+	
 	ExecReScan(node->ps.lefttree);
 }
diff --git a/src/backend/executor/nodeGroup.c b/src/backend/executor/nodeGroup.c
index 5e47854..1a8f669 100644
--- a/src/backend/executor/nodeGroup.c
+++ b/src/backend/executor/nodeGroup.c
@@ -40,10 +40,13 @@ ExecGroup(GroupState *node)
 	TupleTableSlot *firsttupleslot;
 	TupleTableSlot *outerslot;
 
+	/* Advance the state to running if just after initialized */
+	AdvanceNodeRunStateTo(node, Running);
+
 	/*
 	 * get state info from node
 	 */
-	if (node->grp_done)
+	if (ExecNode_is_done(node))
 		return NULL;
 	econtext = node->ss.ps.ps_ExprContext;
 	numCols = ((Group *) node->ss.ps.plan)->numCols;
@@ -86,7 +89,7 @@ ExecGroup(GroupState *node)
 		if (TupIsNull(outerslot))
 		{
 			/* empty input, so return nothing */
-			node->grp_done = TRUE;
+			SetNodeRunState(node, Done);
 			return NULL;
 		}
 		/* Copy tuple into firsttupleslot */
@@ -138,7 +141,7 @@ ExecGroup(GroupState *node)
 			if (TupIsNull(outerslot))
 			{
 				/* no more groups, so we're done */
-				node->grp_done = TRUE;
+				SetNodeRunState(node, Done);
 				return NULL;
 			}
 
@@ -207,7 +210,7 @@ ExecInitGroup(Group *node, EState *estate, int eflags)
 	grpstate = makeNode(GroupState);
 	grpstate->ss.ps.plan = (Plan *) node;
 	grpstate->ss.ps.state = estate;
-	grpstate->grp_done = FALSE;
+	SetNodeRunState(grpstate, Inited);
 
 	/*
 	 * create expression context
@@ -282,7 +285,6 @@ ExecReScanGroup(GroupState *node)
 {
 	PlanState  *outerPlan = outerPlanState(node);
 
-	node->grp_done = FALSE;
 	node->ss.ps.ps_TupFromTlist = false;
 	/* must clear first tuple */
 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
@@ -293,4 +295,6 @@ ExecReScanGroup(GroupState *node)
 	 */
 	if (outerPlan->chgParam == NULL)
 		ExecReScan(outerPlan);
+
+	SetNodeRunState(node, Inited);
 }
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 5e05ec3..fcbc44e 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -172,6 +172,7 @@ ExecInitHash(Hash *node, EState *estate, int eflags)
 	hashstate = makeNode(HashState);
 	hashstate->ps.plan = (Plan *) node;
 	hashstate->ps.state = estate;
+	SetNodeRunState(hashstate, Inited);
 	hashstate->hashtable = NULL;
 	hashstate->hashkeys = NIL;	/* will be set by parent HashJoin */
 
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 1d78cdf..064421e 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -451,6 +451,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 	hjstate = makeNode(HashJoinState);
 	hjstate->js.ps.plan = (Plan *) node;
 	hjstate->js.ps.state = estate;
+	SetNodeRunState(hjstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 9f54c46..0e84314 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -403,6 +403,7 @@ ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags)
 	indexstate = makeNode(IndexOnlyScanState);
 	indexstate->ss.ps.plan = (Plan *) node;
 	indexstate->ss.ps.state = estate;
+	SetNodeRunState(indexstate, Inited);
 	indexstate->ioss_HeapFetches = 0;
 
 	/*
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index c0f14db..534d2f4 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -828,6 +828,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
 	indexstate = makeNode(IndexScanState);
 	indexstate->ss.ps.plan = (Plan *) node;
 	indexstate->ss.ps.state = estate;
+	SetNodeRunState(indexstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index 40ac0d7..1b675d4 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -384,6 +384,7 @@ ExecInitLimit(Limit *node, EState *estate, int eflags)
 	limitstate = makeNode(LimitState);
 	limitstate->ps.plan = (Plan *) node;
 	limitstate->ps.state = estate;
+	SetNodeRunState(limitstate, Inited);
 
 	limitstate->lstate = LIMIT_INITIAL;
 
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index b9b0f06..eeeca0b 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -365,6 +365,7 @@ ExecInitLockRows(LockRows *node, EState *estate, int eflags)
 	lrstate = makeNode(LockRowsState);
 	lrstate->ps.plan = (Plan *) node;
 	lrstate->ps.state = estate;
+	SetNodeRunState(lrstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index b2b5aa7..d9a67f4 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -171,6 +171,7 @@ ExecInitMaterial(Material *node, EState *estate, int eflags)
 	matstate = makeNode(MaterialState);
 	matstate->ss.ps.plan = (Plan *) node;
 	matstate->ss.ps.state = estate;
+	SetNodeRunState(matstate, Inited);
 
 	/*
 	 * We must have a tuplestore buffering the subplan output to do backward
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index bdf7680..3901255 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -83,6 +83,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	 */
 	mergestate->ps.plan = (Plan *) node;
 	mergestate->ps.state = estate;
+	SetNodeRunState(mergestate, Inited);
 	mergestate->mergeplans = mergeplanstates;
 	mergestate->ms_nplans = nplans;
 
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 34b6cf6..9970db1 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -1485,6 +1485,7 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
 	mergestate = makeNode(MergeJoinState);
 	mergestate->js.ps.plan = (Plan *) node;
 	mergestate->js.ps.state = estate;
+	SetNodeRunState(mergestate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index dabaea9..b59c94e 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1268,6 +1268,9 @@ ExecModifyTable(ModifyTableState *node)
 	HeapTupleData oldtupdata;
 	HeapTuple	oldtuple;
 
+	/* Advance the state to running if just after initialized */
+	AdvanceNodeRunStateTo(node, Running);
+
 	/*
 	 * This should NOT get called during EvalPlanQual; we should have passed a
 	 * subplan tree to EvalPlanQual, instead.  Use a runtime test not just
@@ -1286,7 +1289,7 @@ ExecModifyTable(ModifyTableState *node)
 	 * our subplan's nodes aren't necessarily robust against being called
 	 * extra times.
 	 */
-	if (node->mt_done)
+	if (ExecNode_is_done(node))
 		return NULL;
 
 	/*
@@ -1463,7 +1466,7 @@ ExecModifyTable(ModifyTableState *node)
 	 */
 	fireASTriggers(node);
 
-	node->mt_done = true;
+	SetNodeRunState(node, Done);
 
 	return NULL;
 }
@@ -1494,11 +1497,11 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	mtstate = makeNode(ModifyTableState);
 	mtstate->ps.plan = (Plan *) node;
 	mtstate->ps.state = estate;
+	SetNodeRunState(mtstate, Inited);
 	mtstate->ps.targetlist = NIL;		/* not actually used */
 
 	mtstate->operation = operation;
 	mtstate->canSetTag = node->canSetTag;
-	mtstate->mt_done = false;
 
 	mtstate->mt_plans = (PlanState **) palloc0(sizeof(PlanState *) * nplans);
 	mtstate->resultRelInfo = estate->es_result_relations + node->resultRelIndex;
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index e66bcda..b4c2f26 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -309,6 +309,7 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
 	nlstate = makeNode(NestLoopState);
 	nlstate->js.ps.plan = (Plan *) node;
 	nlstate->js.ps.state = estate;
+	SetNodeRunState(nlstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeRecursiveunion.c b/src/backend/executor/nodeRecursiveunion.c
index 8df1639..118496e 100644
--- a/src/backend/executor/nodeRecursiveunion.c
+++ b/src/backend/executor/nodeRecursiveunion.c
@@ -176,6 +176,7 @@ ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
 	rustate = makeNode(RecursiveUnionState);
 	rustate->ps.plan = (Plan *) node;
 	rustate->ps.state = estate;
+	SetNodeRunState(rustate, Inited);
 
 	rustate->eqfunctions = NULL;
 	rustate->hashfunctions = NULL;
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index 8d3dde0..b4ee402 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -217,6 +217,7 @@ ExecInitResult(Result *node, EState *estate, int eflags)
 	resstate = makeNode(ResultState);
 	resstate->ps.plan = (Plan *) node;
 	resstate->ps.state = estate;
+	SetNodeRunState(resstate, Inited);
 
 	resstate->rs_done = false;
 	resstate->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
diff --git a/src/backend/executor/nodeSamplescan.c b/src/backend/executor/nodeSamplescan.c
index dbe84b0..0f75110 100644
--- a/src/backend/executor/nodeSamplescan.c
+++ b/src/backend/executor/nodeSamplescan.c
@@ -152,6 +152,7 @@ ExecInitSampleScan(SampleScan *node, EState *estate, int eflags)
 	scanstate = makeNode(SampleScanState);
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
+	SetNodeRunState(scanstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index b858f2f..0ee33ed 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -177,6 +177,7 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 	scanstate = makeNode(SeqScanState);
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
+	SetNodeRunState(scanstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 7d00cc5..123d051 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -197,6 +197,9 @@ ExecSetOp(SetOpState *node)
 	SetOp	   *plannode = (SetOp *) node->ps.plan;
 	TupleTableSlot *resultTupleSlot = node->ps.ps_ResultTupleSlot;
 
+	/* Advance the state to running if just after initialized */
+	AdvanceNodeRunStateTo(node, Running);
+
 	/*
 	 * If the previously-returned tuple needs to be returned more than once,
 	 * keep returning it.
@@ -208,7 +211,7 @@ ExecSetOp(SetOpState *node)
 	}
 
 	/* Otherwise, we're done if we are out of groups */
-	if (node->setop_done)
+	if (ExecNode_is_done(node))
 		return NULL;
 
 	/* Fetch the next tuple group according to the correct strategy */
@@ -244,7 +247,7 @@ setop_retrieve_direct(SetOpState *setopstate)
 	/*
 	 * We loop retrieving groups until we find one we should return
 	 */
-	while (!setopstate->setop_done)
+	while (ExecNode_is_running(setopstate))
 	{
 		/*
 		 * If we don't already have the first tuple of the new group, fetch it
@@ -261,7 +264,7 @@ setop_retrieve_direct(SetOpState *setopstate)
 			else
 			{
 				/* outer plan produced no tuples at all */
-				setopstate->setop_done = true;
+				SetNodeRunState(setopstate, Done);
 				return NULL;
 			}
 		}
@@ -293,7 +296,7 @@ setop_retrieve_direct(SetOpState *setopstate)
 			if (TupIsNull(outerslot))
 			{
 				/* no more outer-plan tuples available */
-				setopstate->setop_done = true;
+				SetNodeRunState(setopstate, Done);
 				break;
 			}
 
@@ -433,7 +436,7 @@ setop_retrieve_hash_table(SetOpState *setopstate)
 	/*
 	 * We loop retrieving groups until we find one we should return
 	 */
-	while (!setopstate->setop_done)
+	while (ExecNode_is_running(setopstate))
 	{
 		/*
 		 * Find the next entry in the hash table
@@ -442,7 +445,7 @@ setop_retrieve_hash_table(SetOpState *setopstate)
 		if (entry == NULL)
 		{
 			/* No more entries in hashtable, so done */
-			setopstate->setop_done = true;
+			SetNodeRunState(setopstate, Done);
 			return NULL;
 		}
 
@@ -490,7 +493,7 @@ ExecInitSetOp(SetOp *node, EState *estate, int eflags)
 
 	setopstate->eqfunctions = NULL;
 	setopstate->hashfunctions = NULL;
-	setopstate->setop_done = false;
+	SetNodeRunState(setopstate, Inited);
 	setopstate->numOutput = 0;
 	setopstate->pergroup = NULL;
 	setopstate->grp_firstTuple = NULL;
@@ -601,7 +604,6 @@ void
 ExecReScanSetOp(SetOpState *node)
 {
 	ExecClearTuple(node->ps.ps_ResultTupleSlot);
-	node->setop_done = false;
 	node->numOutput = 0;
 
 	if (((SetOp *) node->ps.plan)->strategy == SETOP_HASHED)
@@ -651,4 +653,6 @@ ExecReScanSetOp(SetOpState *node)
 	 */
 	if (node->ps.lefttree->chgParam == NULL)
 		ExecReScan(node->ps.lefttree);
+
+	SetNodeRunState(node, Inited);
 }
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index af1dccf..3ae5b89 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -162,6 +162,7 @@ ExecInitSort(Sort *node, EState *estate, int eflags)
 	sortstate = makeNode(SortState);
 	sortstate->ss.ps.plan = (Plan *) node;
 	sortstate->ss.ps.state = estate;
+	SetNodeRunState(sortstate, Inited);
 
 	/*
 	 * We must have random access to the sort output to do backward scan or
diff --git a/src/backend/executor/nodeSubqueryscan.c b/src/backend/executor/nodeSubqueryscan.c
index e5d1e54..497d6df 100644
--- a/src/backend/executor/nodeSubqueryscan.c
+++ b/src/backend/executor/nodeSubqueryscan.c
@@ -117,6 +117,7 @@ ExecInitSubqueryScan(SubqueryScan *node, EState *estate, int eflags)
 	subquerystate = makeNode(SubqueryScanState);
 	subquerystate->ss.ps.plan = (Plan *) node;
 	subquerystate->ss.ps.state = estate;
+	SetNodeRunState(subquerystate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeTidscan.c b/src/backend/executor/nodeTidscan.c
index 203f1ac..f19e735 100644
--- a/src/backend/executor/nodeTidscan.c
+++ b/src/backend/executor/nodeTidscan.c
@@ -461,6 +461,7 @@ ExecInitTidScan(TidScan *node, EState *estate, int eflags)
 	tidstate = makeNode(TidScanState);
 	tidstate->ss.ps.plan = (Plan *) node;
 	tidstate->ss.ps.state = estate;
+	SetNodeRunState(tidstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeUnique.c b/src/backend/executor/nodeUnique.c
index 1cb4a8a..f259f32 100644
--- a/src/backend/executor/nodeUnique.c
+++ b/src/backend/executor/nodeUnique.c
@@ -122,6 +122,7 @@ ExecInitUnique(Unique *node, EState *estate, int eflags)
 	uniquestate = makeNode(UniqueState);
 	uniquestate->ps.plan = (Plan *) node;
 	uniquestate->ps.state = estate;
+	SetNodeRunState(uniquestate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeValuesscan.c b/src/backend/executor/nodeValuesscan.c
index a39695a..c56199c 100644
--- a/src/backend/executor/nodeValuesscan.c
+++ b/src/backend/executor/nodeValuesscan.c
@@ -205,6 +205,7 @@ ExecInitValuesScan(ValuesScan *node, EState *estate, int eflags)
 	scanstate = makeNode(ValuesScanState);
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
+	SetNodeRunState(scanstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index c371d4d..8734e8e 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -1564,7 +1564,10 @@ ExecWindowAgg(WindowAggState *winstate)
 	int			i;
 	int			numfuncs;
 
-	if (winstate->all_done)
+	/* Advance the state to running if just after initialized */
+	AdvanceNodeRunStateTo(winstate, Running);
+
+	if (ExecNode_is_done(winstate))
 		return NULL;
 
 	/*
@@ -1686,7 +1689,7 @@ restart:
 		}
 		else
 		{
-			winstate->all_done = true;
+			SetNodeRunState(winstate, Done);
 			return NULL;
 		}
 	}
@@ -1787,6 +1790,7 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
 	winstate = makeNode(WindowAggState);
 	winstate->ss.ps.plan = (Plan *) node;
 	winstate->ss.ps.state = estate;
+	SetNodeRunState(winstate, Inited);
 
 	/*
 	 * Create expression contexts.  We need two, one for per-input-tuple
@@ -2060,8 +2064,6 @@ ExecReScanWindowAgg(WindowAggState *node)
 	PlanState  *outerPlan = outerPlanState(node);
 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
 
-	node->all_done = false;
-
 	node->ss.ps.ps_TupFromTlist = false;
 	node->all_first = true;
 
@@ -2085,6 +2087,8 @@ ExecReScanWindowAgg(WindowAggState *node)
 	 */
 	if (outerPlan->chgParam == NULL)
 		ExecReScan(outerPlan);
+
+	SetNodeRunState(node, Inited);
 }
 
 /*
diff --git a/src/backend/executor/nodeWorktablescan.c b/src/backend/executor/nodeWorktablescan.c
index 618508e..799e96b 100644
--- a/src/backend/executor/nodeWorktablescan.c
+++ b/src/backend/executor/nodeWorktablescan.c
@@ -144,6 +144,7 @@ ExecInitWorkTableScan(WorkTableScan *node, EState *estate, int eflags)
 	scanstate = makeNode(WorkTableScanState);
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
+	SetNodeRunState(scanstate, Inited);
 	scanstate->rustate = NULL;	/* we'll set this later */
 
 	/*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index eb3591a..4527a98 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -344,6 +344,27 @@ typedef struct ResultRelInfo
 } ResultRelInfo;
 
 /* ----------------
+ *  Enumeration and macros for executor node running state.
+ */
+typedef enum ExecNodeRunState
+{
+	ERunState_Inited,		/* Just after initialized */
+	ERunState_Started,		/* Execution started but needs one more call
+							 * for the first tuple */
+	ERunState_Running,		/* Returning the next tuple */
+	ERunState_Done			/* No tuple to return  */
+} ExecNodeRunState;
+
+#define SetNodeRunState(nd,st) (((PlanState*)nd)->runstate = (ERunState_##st))
+#define AdvanceNodeRunStateTo(nd,st) \
+	do {\
+		if (((PlanState*)nd)->runstate < (ERunState_##st))\
+			((PlanState*)nd)->runstate = (ERunState_##st);\
+	} while(0);
+#define ExecNode_is_running(nd)	(((PlanState*)nd)->runstate == ERunState_Running)
+#define ExecNode_is_done(nd)	(((PlanState*)nd)->runstate == ERunState_Done)
+
+/* ----------------
  *	  EState information
  *
  * Master working state for an Executor invocation
@@ -1056,6 +1077,7 @@ typedef struct PlanState
 	ProjectionInfo *ps_ProjInfo;	/* info for doing tuple projection */
 	bool		ps_TupFromTlist;/* state flag for processing set-valued
 								 * functions in targetlist */
+	ExecNodeRunState runstate;	/* Execution state of this node */
 } PlanState;
 
 /* ----------------
@@ -1117,7 +1139,6 @@ typedef struct ModifyTableState
 	PlanState	ps;				/* its first field is NodeTag */
 	CmdType		operation;		/* INSERT, UPDATE, or DELETE */
 	bool		canSetTag;		/* do we set the command tag/es_processed? */
-	bool		mt_done;		/* are we done? */
 	PlanState **mt_plans;		/* subplans (one per target rel) */
 	int			mt_nplans;		/* number of plans in the array */
 	int			mt_whichplan;	/* which one is being executed (0..n-1) */
@@ -1812,7 +1833,6 @@ typedef struct GroupState
 {
 	ScanState	ss;				/* its first field is NodeTag */
 	FmgrInfo   *eqfunctions;	/* per-field lookup data for equality fns */
-	bool		grp_done;		/* indicates completion of Group scan */
 } GroupState;
 
 /* ---------------------
@@ -1917,7 +1937,6 @@ typedef struct WindowAggState
 	ExprContext *tmpcontext;	/* short-term evaluation context */
 
 	bool		all_first;		/* true if the scan is starting */
-	bool		all_done;		/* true if the scan is finished */
 	bool		partition_spooled;		/* true if all tuples in current
 										 * partition have been spooled into
 										 * tuplestore */
@@ -1965,7 +1984,6 @@ typedef struct UniqueState
 typedef struct GatherState
 {
 	PlanState	ps;				/* its first field is NodeTag */
-	bool		initialized;
 	struct ParallelExecutorInfo *pei;
 	int			nreaders;
 	int			nextreader;
@@ -2003,7 +2021,6 @@ typedef struct SetOpState
 	PlanState	ps;				/* its first field is NodeTag */
 	FmgrInfo   *eqfunctions;	/* per-grouping-field equality fns */
 	FmgrInfo   *hashfunctions;	/* per-grouping-field hash fns */
-	bool		setop_done;		/* indicates completion of output scan */
 	long		numOutput;		/* number of dups left to output */
 	MemoryContext tempContext;	/* short-term context for comparisons */
 	/* these fields are used in SETOP_SORTED mode: */
-- 
1.8.3.1

>From e3f764239f6494616137c2c23f8246bec9c52cd4 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 8 Jul 2015 17:39:47 +0900
Subject: [PATCH 2/5] Change all tuple-returning execution nodes to maintain
 run-state appropriately.

This doesn't change any behavior but maintain run-state to be
consistent with whether returning tuple is null or not at
ExecProcNode.
---
 src/backend/executor/nodeAgg.c             |  6 ++++++
 src/backend/executor/nodeAppend.c          |  7 +++++++
 src/backend/executor/nodeBitmapAnd.c       |  6 ++++++
 src/backend/executor/nodeBitmapHeapscan.c  | 13 ++++++++++++-
 src/backend/executor/nodeBitmapIndexscan.c |  6 ++++++
 src/backend/executor/nodeBitmapOr.c        |  6 ++++++
 src/backend/executor/nodeCtescan.c         | 13 ++++++++++++-
 src/backend/executor/nodeCustom.c          | 15 ++++++++++++++-
 src/backend/executor/nodeForeignscan.c     | 12 +++++++++++-
 src/backend/executor/nodeFunctionscan.c    | 13 ++++++++++++-
 src/backend/executor/nodeGather.c          |  9 +++++++--
 src/backend/executor/nodeGroup.c           |  4 ++--
 src/backend/executor/nodeHash.c            |  6 ++++++
 src/backend/executor/nodeHashjoin.c        | 11 +++++++++++
 src/backend/executor/nodeIndexonlyscan.c   | 16 +++++++++++++++-
 src/backend/executor/nodeIndexscan.c       | 18 ++++++++++++++++--
 src/backend/executor/nodeLimit.c           | 22 ++++++++++++++++++++++
 src/backend/executor/nodeLockRows.c        |  7 +++++++
 src/backend/executor/nodeMaterial.c        |  9 +++++++++
 src/backend/executor/nodeMergeAppend.c     |  5 +++++
 src/backend/executor/nodeMergejoin.c       | 12 +++++++++++-
 src/backend/executor/nodeNestloop.c        |  5 +++++
 src/backend/executor/nodeRecursiveunion.c  |  5 +++++
 src/backend/executor/nodeResult.c          | 12 ++++++++++++
 src/backend/executor/nodeSamplescan.c      | 10 +++++++++-
 src/backend/executor/nodeSeqscan.c         | 11 ++++++++++-
 src/backend/executor/nodeSetOp.c           |  3 +--
 src/backend/executor/nodeSort.c            | 11 +++++++++++
 src/backend/executor/nodeSubqueryscan.c    | 11 ++++++++++-
 src/backend/executor/nodeTidscan.c         | 12 +++++++++++-
 src/backend/executor/nodeUnique.c          |  5 +++++
 src/backend/executor/nodeValuesscan.c      | 12 +++++++++++-
 src/backend/executor/nodeWindowAgg.c       |  3 +--
 src/backend/executor/nodeWorktablescan.c   | 12 +++++++++++-
 src/include/nodes/execnodes.h              | 12 +++++++-----
 35 files changed, 312 insertions(+), 28 deletions(-)

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 2ef3bdf..ed29e3a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1522,6 +1522,8 @@ ExecAgg(AggState *node)
 {
 	TupleTableSlot *result;
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * Check to see if we're still projecting out tuples from a previous agg
 	 * tuple (because there is a function-returning-set in the projection
@@ -1562,6 +1564,7 @@ ExecAgg(AggState *node)
 			return result;
 	}
 
+	SetNodeRunState(node, Done);
 	return NULL;
 }
 
@@ -2961,6 +2964,9 @@ ExecReScanAgg(AggState *node)
 	int			numGroupingSets = Max(node->maxsets, 1);
 	int			setno;
 
+
+	SetNodeRunState(node, Inited);
+
 	node->agg_done = false;
 
 	node->ss.ps.ps_TupFromTlist = false;
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 4718c0f..03b3b66 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -194,6 +194,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecAppend(AppendState *node)
 {
+	SetNodeRunState(node, Running);
+
 	for (;;)
 	{
 		PlanState  *subnode;
@@ -229,7 +231,10 @@ ExecAppend(AppendState *node)
 		else
 			node->as_whichplan--;
 		if (!exec_append_initialize_next(node))
+		{
+			SetNodeRunState(node, Done);
 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		}
 
 		/* Else loop back and try to get a tuple from the new subplan */
 	}
@@ -268,6 +273,8 @@ ExecReScanAppend(AppendState *node)
 {
 	int			i;
 
+	SetNodeRunState(node, Inited);
+
 	for (i = 0; i < node->as_nplans; i++)
 	{
 		PlanState  *subnode = node->appendplans[i];
diff --git a/src/backend/executor/nodeBitmapAnd.c b/src/backend/executor/nodeBitmapAnd.c
index 8bc5bbe..64f202e 100644
--- a/src/backend/executor/nodeBitmapAnd.c
+++ b/src/backend/executor/nodeBitmapAnd.c
@@ -105,6 +105,8 @@ MultiExecBitmapAnd(BitmapAndState *node)
 	if (node->ps.instrument)
 		InstrStartNode(node->ps.instrument);
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get information from the node
 	 */
@@ -146,6 +148,8 @@ MultiExecBitmapAnd(BitmapAndState *node)
 	if (result == NULL)
 		elog(ERROR, "BitmapAnd doesn't support zero inputs");
 
+	SetNodeRunState(node, Done);
+
 	/* must provide our own instrumentation support */
 	if (node->ps.instrument)
 		InstrStopNode(node->ps.instrument, 0 /* XXX */ );
@@ -189,6 +193,8 @@ ExecReScanBitmapAnd(BitmapAndState *node)
 {
 	int			i;
 
+	SetNodeRunState(node, Inited);
+
 	for (i = 0; i < node->nplans; i++)
 	{
 		PlanState  *subnode = node->bitmapplans[i];
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 04ce35a..0dc61ba 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -437,9 +437,18 @@ BitmapHeapRecheck(BitmapHeapScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecBitmapHeapScan(BitmapHeapScanState *node)
 {
-	return ExecScan(&node->ss,
+	TupleTableSlot *slot;
+
+	SetNodeRunState(node, Running);
+
+	slot = ExecScan(&node->ss,
 					(ExecScanAccessMtd) BitmapHeapNext,
 					(ExecScanRecheckMtd) BitmapHeapRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 /* ----------------------------------------------------------------
@@ -451,6 +460,8 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
 {
 	PlanState  *outerPlan = outerPlanState(node);
 
+	SetNodeRunState(node, Inited);
+
 	/* rescan to release any page pin */
 	heap_rescan(node->ss.ss_currentScanDesc, NULL);
 
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index 613054f..acfce3d 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -44,6 +44,8 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
 	if (node->ss.ps.instrument)
 		InstrStartNode(node->ss.ps.instrument);
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * extract necessary information from index scan node
 	 */
@@ -98,6 +100,8 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
 						 NULL, 0);
 	}
 
+	SetNodeRunState(node, Done);
+
 	/* must provide our own instrumentation support */
 	if (node->ss.ps.instrument)
 		InstrStopNode(node->ss.ps.instrument, nTuples);
@@ -117,6 +121,8 @@ ExecReScanBitmapIndexScan(BitmapIndexScanState *node)
 {
 	ExprContext *econtext = node->biss_RuntimeContext;
 
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * Reset the runtime-key context so we don't leak memory as each outer
 	 * tuple is scanned.  Note this assumes that we will recalculate *all*
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index fcdaeaf..7a5bcf5 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -106,6 +106,8 @@ MultiExecBitmapOr(BitmapOrState *node)
 	if (node->ps.instrument)
 		InstrStartNode(node->ps.instrument);
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get information from the node
 	 */
@@ -162,6 +164,8 @@ MultiExecBitmapOr(BitmapOrState *node)
 	if (result == NULL)
 		elog(ERROR, "BitmapOr doesn't support zero inputs");
 
+	SetNodeRunState(node, Done);
+
 	/* must provide our own instrumentation support */
 	if (node->ps.instrument)
 		InstrStopNode(node->ps.instrument, 0 /* XXX */ );
@@ -205,6 +209,8 @@ ExecReScanBitmapOr(BitmapOrState *node)
 {
 	int			i;
 
+	SetNodeRunState(node, Inited);
+
 	for (i = 0; i < node->nplans; i++)
 	{
 		PlanState  *subnode = node->bitmapplans[i];
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index 666ef91..d237370 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -152,9 +152,18 @@ CteScanRecheck(CteScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecCteScan(CteScanState *node)
 {
-	return ExecScan(&node->ss,
+	TupleTableSlot *slot;
+
+	SetNodeRunState(node, Running);
+
+	slot = ExecScan(&node->ss,
 					(ExecScanAccessMtd) CteScanNext,
 					(ExecScanRecheckMtd) CteScanRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 
@@ -312,6 +321,8 @@ ExecReScanCteScan(CteScanState *node)
 {
 	Tuplestorestate *tuplestorestate = node->leader->cte_table;
 
+	SetNodeRunState(node, Inited);
+
 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
 
 	ExecScanReScan(&node->ss);
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index e7e3b17..9f85fd7 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -110,8 +110,16 @@ ExecInitCustomScan(CustomScan *cscan, EState *estate, int eflags)
 TupleTableSlot *
 ExecCustomScan(CustomScanState *node)
 {
+	TupleTableSlot *slot;
+
 	Assert(node->methods->ExecCustomScan != NULL);
-	return node->methods->ExecCustomScan(node);
+	SetNodeRunState(node, Running);
+	slot = node->methods->ExecCustomScan(node);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 void
@@ -136,6 +144,7 @@ void
 ExecReScanCustomScan(CustomScanState *node)
 {
 	Assert(node->methods->ReScanCustomScan != NULL);
+	SetNodeRunState(node, Inited);
 	node->methods->ReScanCustomScan(node);
 }
 
@@ -158,5 +167,9 @@ ExecCustomRestrPos(CustomScanState *node)
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("custom-scan \"%s\" does not support MarkPos",
 						node->methods->CustomName)));
+
 	node->methods->RestrPosCustomScan(node);
+
+	/* Restoring position in turn restores run state */
+	SetNodeRunState(node, Running);
 }
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 90483e4..895af86 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -100,9 +100,17 @@ ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecForeignScan(ForeignScanState *node)
 {
-	return ExecScan((ScanState *) node,
+	TupleTableSlot * slot;
+
+	SetNodeRunState(node, Running);
+	slot = ExecScan((ScanState *) node,
 					(ExecScanAccessMtd) ForeignNext,
 					(ExecScanRecheckMtd) ForeignRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 
@@ -247,6 +255,8 @@ ExecEndForeignScan(ForeignScanState *node)
 void
 ExecReScanForeignScan(ForeignScanState *node)
 {
+	SetNodeRunState(node, Inited);
+
 	node->fdwroutine->ReScanForeignScan(node);
 
 	ExecScanReScan(&node->ss);
diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c
index 849b54f..08f9bbf 100644
--- a/src/backend/executor/nodeFunctionscan.c
+++ b/src/backend/executor/nodeFunctionscan.c
@@ -265,9 +265,18 @@ FunctionRecheck(FunctionScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecFunctionScan(FunctionScanState *node)
 {
-	return ExecScan(&node->ss,
+	TupleTableSlot *slot;
+
+	SetNodeRunState(node, Running);
+
+	slot = ExecScan(&node->ss,
 					(ExecScanAccessMtd) FunctionNext,
 					(ExecScanRecheckMtd) FunctionRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 /* ----------------------------------------------------------------
@@ -569,6 +578,8 @@ ExecReScanFunctionScan(FunctionScanState *node)
 	int			i;
 	Bitmapset  *chgparam = node->ss.ps.chgParam;
 
+	SetNodeRunState(node, Inited);
+
 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
 	for (i = 0; i < node->nfuncs; i++)
 	{
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 0e71dfc..aceb358 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -135,13 +135,15 @@ ExecGather(GatherState *node)
 	ExprDoneCond isDone;
 	ExprContext *econtext;
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * Initialize the parallel context and workers on first execution. We do
 	 * this on first execution rather than during node initialization, as it
 	 * needs to allocate large dynamic segement, so it is better to do if it
 	 * is really needed.
 	 */
-	if (!ExecNode_is_running(node))
+	if (ExecNode_is_inited(node))
 	{
 		EState	   *estate = node->ps.state;
 		Gather	   *gather = (Gather *) node->ps.plan;
@@ -232,7 +234,10 @@ ExecGather(GatherState *node)
 		 */
 		slot = gather_getnext(node);
 		if (TupIsNull(slot))
+		{
+			SetNodeRunState(node, Done);
 			return NULL;
+		}
 
 		/*
 		 * form the result tuple using ExecProject(), and return it --- unless
@@ -461,6 +466,6 @@ ExecReScanGather(GatherState *node)
 		ExecParallelReinitialize(node->pei);
 
 	SetNodeRunState(node, Inited);
-	
+
 	ExecReScan(node->ps.lefttree);
 }
diff --git a/src/backend/executor/nodeGroup.c b/src/backend/executor/nodeGroup.c
index 1a8f669..a593d9f 100644
--- a/src/backend/executor/nodeGroup.c
+++ b/src/backend/executor/nodeGroup.c
@@ -285,6 +285,8 @@ ExecReScanGroup(GroupState *node)
 {
 	PlanState  *outerPlan = outerPlanState(node);
 
+	SetNodeRunState(node, Inited);
+
 	node->ss.ps.ps_TupFromTlist = false;
 	/* must clear first tuple */
 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
@@ -295,6 +297,4 @@ ExecReScanGroup(GroupState *node)
 	 */
 	if (outerPlan->chgParam == NULL)
 		ExecReScan(outerPlan);
-
-	SetNodeRunState(node, Inited);
 }
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index fcbc44e..5a71fe3 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -84,6 +84,8 @@ MultiExecHash(HashState *node)
 	if (node->ps.instrument)
 		InstrStartNode(node->ps.instrument);
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get state info from node
 	 */
@@ -138,6 +140,8 @@ MultiExecHash(HashState *node)
 	if (hashtable->spaceUsed > hashtable->spacePeak)
 		hashtable->spacePeak = hashtable->spaceUsed;
 
+	SetNodeRunState(node, Done);
+
 	/* must provide our own instrumentation support */
 	if (node->ps.instrument)
 		InstrStopNode(node->ps.instrument, hashtable->totalTuples);
@@ -1270,6 +1274,8 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable)
 void
 ExecReScanHash(HashState *node)
 {
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * if chgParam of subnode is not null then plan will be re-scanned by
 	 * first ExecProcNode.
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 064421e..dbaabc4 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -72,6 +72,8 @@ ExecHashJoin(HashJoinState *node)
 	uint32		hashvalue;
 	int			batchno;
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get information from HashJoin node
 	 */
@@ -155,6 +157,7 @@ ExecHashJoin(HashJoinState *node)
 					if (TupIsNull(node->hj_FirstOuterTupleSlot))
 					{
 						node->hj_OuterNotEmpty = false;
+						SetNodeRunState(node, Done);
 						return NULL;
 					}
 					else
@@ -183,7 +186,10 @@ ExecHashJoin(HashJoinState *node)
 				 * outer relation.
 				 */
 				if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
+				{
+					SetNodeRunState(node, Done);
 					return NULL;
+				}
 
 				/*
 				 * need to remember whether nbatch has increased since we
@@ -414,7 +420,10 @@ ExecHashJoin(HashJoinState *node)
 				 * Try to advance to next batch.  Done if there are no more.
 				 */
 				if (!ExecHashJoinNewBatch(node))
+				{
+					SetNodeRunState(node, Done);
 					return NULL;	/* end of join */
+				}
 				node->hj_JoinState = HJ_NEED_NEW_OUTER;
 				break;
 
@@ -944,6 +953,8 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
 void
 ExecReScanHashJoin(HashJoinState *node)
 {
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * In a multi-batch join, we currently have to do rescans the hard way,
 	 * primarily because batch temp files may have already been released. But
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 0e84314..b3676c9 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -252,15 +252,24 @@ IndexOnlyRecheck(IndexOnlyScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecIndexOnlyScan(IndexOnlyScanState *node)
 {
+	TupleTableSlot *slot;
+
 	/*
 	 * If we have runtime keys and they've not already been set up, do it now.
 	 */
 	if (node->ioss_NumRuntimeKeys != 0 && !node->ioss_RuntimeKeysReady)
 		ExecReScan((PlanState *) node);
 
-	return ExecScan(&node->ss,
+	SetNodeRunState(node, Running);
+
+	slot = ExecScan(&node->ss,
 					(ExecScanAccessMtd) IndexOnlyNext,
 					(ExecScanRecheckMtd) IndexOnlyRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 /* ----------------------------------------------------------------
@@ -277,6 +286,8 @@ ExecIndexOnlyScan(IndexOnlyScanState *node)
 void
 ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
 {
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * If we are doing runtime key calculations (ie, any of the index key
 	 * values weren't simple Consts), compute the new key values.  But first,
@@ -376,6 +387,9 @@ void
 ExecIndexOnlyRestrPos(IndexOnlyScanState *node)
 {
 	index_restrpos(node->ioss_ScanDesc);
+
+	/* Restoring position in turn restores run state */
+	SetNodeRunState(node, Running);
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 534d2f4..a343c5c 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -484,20 +484,29 @@ reorderqueue_pop(IndexScanState *node)
 TupleTableSlot *
 ExecIndexScan(IndexScanState *node)
 {
+	TupleTableSlot *slot;
+
 	/*
 	 * If we have runtime keys and they've not already been set up, do it now.
 	 */
 	if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady)
 		ExecReScan((PlanState *) node);
 
+	SetNodeRunState(node, Running);
+
 	if (node->iss_NumOrderByKeys > 0)
-		return ExecScan(&node->ss,
+		slot = ExecScan(&node->ss,
 						(ExecScanAccessMtd) IndexNextWithReorder,
 						(ExecScanRecheckMtd) IndexRecheck);
 	else
-		return ExecScan(&node->ss,
+		slot = ExecScan(&node->ss,
 						(ExecScanAccessMtd) IndexNext,
 						(ExecScanRecheckMtd) IndexRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 /* ----------------------------------------------------------------
@@ -514,6 +523,8 @@ ExecIndexScan(IndexScanState *node)
 void
 ExecReScanIndexScan(IndexScanState *node)
 {
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * If we are doing runtime key calculations (ie, any of the index key
 	 * values weren't simple Consts), compute the new key values.  But first,
@@ -802,6 +813,9 @@ void
 ExecIndexRestrPos(IndexScanState *node)
 {
 	index_restrpos(node->iss_ScanDesc);
+
+	/* Restoring position in turn restores run state */
+	SetNodeRunState(node, Running);
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index 1b675d4..e59d71f 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -43,6 +43,8 @@ ExecLimit(LimitState *node)
 	TupleTableSlot *slot;
 	PlanState  *outerPlan;
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get information from the node
 	 */
@@ -72,7 +74,10 @@ ExecLimit(LimitState *node)
 			 * If backwards scan, just return NULL without changing state.
 			 */
 			if (!ScanDirectionIsForward(direction))
+			{
+				SetNodeRunState(node, Done);
 				return NULL;
+			}
 
 			/*
 			 * Check for empty window; if so, treat like empty subplan.
@@ -80,6 +85,7 @@ ExecLimit(LimitState *node)
 			if (node->count <= 0 && !node->noCount)
 			{
 				node->lstate = LIMIT_EMPTY;
+				SetNodeRunState(node, Done);
 				return NULL;
 			}
 
@@ -96,6 +102,7 @@ ExecLimit(LimitState *node)
 					 * any output at all.
 					 */
 					node->lstate = LIMIT_EMPTY;
+					SetNodeRunState(node, Done);
 					return NULL;
 				}
 				node->subSlot = slot;
@@ -115,6 +122,7 @@ ExecLimit(LimitState *node)
 			 * The subplan is known to return no tuples (or not more than
 			 * OFFSET tuples, in general).  So we return no tuples.
 			 */
+			SetNodeRunState(node, Done);
 			return NULL;
 
 		case LIMIT_INWINDOW:
@@ -130,6 +138,7 @@ ExecLimit(LimitState *node)
 					node->position - node->offset >= node->count)
 				{
 					node->lstate = LIMIT_WINDOWEND;
+					SetNodeRunState(node, Done);
 					return NULL;
 				}
 
@@ -140,6 +149,7 @@ ExecLimit(LimitState *node)
 				if (TupIsNull(slot))
 				{
 					node->lstate = LIMIT_SUBPLANEOF;
+					SetNodeRunState(node, Done);
 					return NULL;
 				}
 				node->subSlot = slot;
@@ -154,6 +164,7 @@ ExecLimit(LimitState *node)
 				if (node->position <= node->offset + 1)
 				{
 					node->lstate = LIMIT_WINDOWSTART;
+					SetNodeRunState(node, Done);
 					return NULL;
 				}
 
@@ -170,7 +181,10 @@ ExecLimit(LimitState *node)
 
 		case LIMIT_SUBPLANEOF:
 			if (ScanDirectionIsForward(direction))
+			{
+				SetNodeRunState(node, Done);
 				return NULL;
+			}
 
 			/*
 			 * Backing up from subplan EOF, so re-fetch previous tuple; there
@@ -186,7 +200,10 @@ ExecLimit(LimitState *node)
 
 		case LIMIT_WINDOWEND:
 			if (ScanDirectionIsForward(direction))
+			{
+				SetNodeRunState(node, Done);
 				return NULL;
+			}
 
 			/*
 			 * Backing up from window end: simply re-return the last tuple
@@ -199,7 +216,10 @@ ExecLimit(LimitState *node)
 
 		case LIMIT_WINDOWSTART:
 			if (!ScanDirectionIsForward(direction))
+			{
+				SetNodeRunState(node, Done);
 				return NULL;
+			}
 
 			/*
 			 * Advancing after having backed off window start: simply
@@ -443,6 +463,8 @@ ExecEndLimit(LimitState *node)
 void
 ExecReScanLimit(LimitState *node)
 {
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * Recompute limit/offset in case parameters changed, and reset the state
 	 * machine.  We must do this before rescanning our child node, in case
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index eeeca0b..2ccf05d 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -44,6 +44,8 @@ ExecLockRows(LockRowsState *node)
 	bool		epq_needed;
 	ListCell   *lc;
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get information from the node
 	 */
@@ -57,7 +59,10 @@ lnext:
 	slot = ExecProcNode(outerPlan);
 
 	if (TupIsNull(slot))
+	{
+		SetNodeRunState(node, Done);
 		return NULL;
+	}
 
 	/* We don't need EvalPlanQual unless we get updated tuple version(s) */
 	epq_needed = false;
@@ -460,6 +465,8 @@ ExecEndLockRows(LockRowsState *node)
 void
 ExecReScanLockRows(LockRowsState *node)
 {
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * if chgParam of subnode is not null then plan will be re-scanned by
 	 * first ExecProcNode.
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index d9a67f4..981398a 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -45,6 +45,8 @@ ExecMaterial(MaterialState *node)
 	bool		eof_tuplestore;
 	TupleTableSlot *slot;
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get state info from node
 	 */
@@ -132,6 +134,7 @@ ExecMaterial(MaterialState *node)
 		if (TupIsNull(outerslot))
 		{
 			node->eof_underlying = true;
+			SetNodeRunState(node, Done);
 			return NULL;
 		}
 
@@ -152,6 +155,7 @@ ExecMaterial(MaterialState *node)
 	/*
 	 * Nothing left ...
 	 */
+	SetNodeRunState(node, Done);
 	return ExecClearTuple(slot);
 }
 
@@ -307,6 +311,9 @@ ExecMaterialRestrPos(MaterialState *node)
 	 * copy the mark to the active read pointer.
 	 */
 	tuplestore_copy_read_pointer(node->tuplestorestate, 1, 0);
+
+	/* Restoring position in turn restores run state */
+	SetNodeRunState(node, Running);
 }
 
 /* ----------------------------------------------------------------
@@ -320,6 +327,8 @@ ExecReScanMaterial(MaterialState *node)
 {
 	PlanState  *outerPlan = outerPlanState(node);
 
+	SetNodeRunState(node, Inited);
+
 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
 
 	if (node->eflags != 0)
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 3901255..4678d7c 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -170,6 +170,8 @@ ExecMergeAppend(MergeAppendState *node)
 	TupleTableSlot *result;
 	SlotNumber	i;
 
+	SetNodeRunState(node, Running);
+
 	if (!node->ms_initialized)
 	{
 		/*
@@ -207,6 +209,7 @@ ExecMergeAppend(MergeAppendState *node)
 	{
 		/* All the subplans are exhausted, and so is the heap */
 		result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		SetNodeRunState(node, Done);
 	}
 	else
 	{
@@ -289,6 +292,8 @@ ExecReScanMergeAppend(MergeAppendState *node)
 {
 	int			i;
 
+	SetNodeRunState(node, Inited);
+
 	for (i = 0; i < node->ms_nplans; i++)
 	{
 		PlanState  *subnode = node->mergeplans[i];
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 9970db1..74ceaa2 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -630,6 +630,8 @@ ExecMergeJoin(MergeJoinState *node)
 	bool		doFillOuter;
 	bool		doFillInner;
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get information from node
 	 */
@@ -728,6 +730,7 @@ ExecMergeJoin(MergeJoinState *node)
 							break;
 						}
 						/* Otherwise we're done. */
+						SetNodeRunState(node, Done);
 						return NULL;
 				}
 				break;
@@ -785,6 +788,7 @@ ExecMergeJoin(MergeJoinState *node)
 							break;
 						}
 						/* Otherwise we're done. */
+						SetNodeRunState(node, Done);
 						return NULL;
 				}
 				break;
@@ -1039,6 +1043,7 @@ ExecMergeJoin(MergeJoinState *node)
 							break;
 						}
 						/* Otherwise we're done. */
+						SetNodeRunState(node, Done);
 						return NULL;
 				}
 				break;
@@ -1174,6 +1179,7 @@ ExecMergeJoin(MergeJoinState *node)
 								break;
 							}
 							/* Otherwise we're done. */
+							SetNodeRunState(node, Done);
 							return NULL;
 					}
 				}
@@ -1292,6 +1298,7 @@ ExecMergeJoin(MergeJoinState *node)
 							break;
 						}
 						/* Otherwise we're done. */
+						SetNodeRunState(node, Done);
 						return NULL;
 				}
 				break;
@@ -1362,6 +1369,7 @@ ExecMergeJoin(MergeJoinState *node)
 							break;
 						}
 						/* Otherwise we're done. */
+						SetNodeRunState(node, Done);
 						return NULL;
 				}
 				break;
@@ -1406,6 +1414,7 @@ ExecMergeJoin(MergeJoinState *node)
 				if (TupIsNull(innerTupleSlot))
 				{
 					MJ_printf("ExecMergeJoin: end of inner subplan\n");
+					SetNodeRunState(node, Done);
 					return NULL;
 				}
 
@@ -1448,6 +1457,7 @@ ExecMergeJoin(MergeJoinState *node)
 				if (TupIsNull(outerTupleSlot))
 				{
 					MJ_printf("ExecMergeJoin: end of outer subplan\n");
+					SetNodeRunState(node, Done);
 					return NULL;
 				}
 
@@ -1682,6 +1692,7 @@ ExecEndMergeJoin(MergeJoinState *node)
 void
 ExecReScanMergeJoin(MergeJoinState *node)
 {
+	SetNodeRunState(node, Inited);
 	ExecClearTuple(node->mj_MarkedTupleSlot);
 
 	node->mj_JoinState = EXEC_MJ_INITIALIZE_OUTER;
@@ -1699,5 +1710,4 @@ ExecReScanMergeJoin(MergeJoinState *node)
 		ExecReScan(node->js.ps.lefttree);
 	if (node->js.ps.righttree->chgParam == NULL)
 		ExecReScan(node->js.ps.righttree);
-
 }
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index b4c2f26..ae69176 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -69,6 +69,8 @@ ExecNestLoop(NestLoopState *node)
 	ExprContext *econtext;
 	ListCell   *lc;
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get information from the node
 	 */
@@ -128,6 +130,7 @@ ExecNestLoop(NestLoopState *node)
 			if (TupIsNull(outerTupleSlot))
 			{
 				ENL1_printf("no outer tuple, ending join");
+				SetNodeRunState(node, Done);
 				return NULL;
 			}
 
@@ -429,6 +432,8 @@ ExecReScanNestLoop(NestLoopState *node)
 {
 	PlanState  *outerPlan = outerPlanState(node);
 
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * If outerPlan->chgParam is not null then plan will be automatically
 	 * re-scanned by first ExecProcNode.
diff --git a/src/backend/executor/nodeRecursiveunion.c b/src/backend/executor/nodeRecursiveunion.c
index 118496e..27a86d3 100644
--- a/src/backend/executor/nodeRecursiveunion.c
+++ b/src/backend/executor/nodeRecursiveunion.c
@@ -81,6 +81,8 @@ ExecRecursiveUnion(RecursiveUnionState *node)
 	TupleTableSlot *slot;
 	bool		isnew;
 
+	SetNodeRunState(node, Running);
+
 	/* 1. Evaluate non-recursive term */
 	if (!node->recursing)
 	{
@@ -154,6 +156,7 @@ ExecRecursiveUnion(RecursiveUnionState *node)
 		return slot;
 	}
 
+	SetNodeRunState(node, Done);
 	return NULL;
 }
 
@@ -309,6 +312,8 @@ ExecReScanRecursiveUnion(RecursiveUnionState *node)
 	PlanState  *innerPlan = innerPlanState(node);
 	RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
 
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * Set recursive term's chgParam to tell it that we'll modify the working
 	 * table and therefore it has to rescan.
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index b4ee402..ec81eda 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -72,6 +72,7 @@ ExecResult(ResultState *node)
 	ExprContext *econtext;
 	ExprDoneCond isDone;
 
+	SetNodeRunState(node, Running);
 	econtext = node->ps.ps_ExprContext;
 
 	/*
@@ -87,6 +88,7 @@ ExecResult(ResultState *node)
 		if (!qualResult)
 		{
 			node->rs_done = true;
+			SetNodeRunState(node, Done);
 			return NULL;
 		}
 	}
@@ -130,7 +132,10 @@ ExecResult(ResultState *node)
 			outerTupleSlot = ExecProcNode(outerPlan);
 
 			if (TupIsNull(outerTupleSlot))
+			{
+				SetNodeRunState(node, Done);
 				return NULL;
+			}
 
 			/*
 			 * prepare to compute projection expressions, which will expect to
@@ -161,6 +166,7 @@ ExecResult(ResultState *node)
 		}
 	}
 
+	SetNodeRunState(node, Done);
 	return NULL;
 }
 
@@ -189,7 +195,12 @@ ExecResultRestrPos(ResultState *node)
 	PlanState  *outerPlan = outerPlanState(node);
 
 	if (outerPlan != NULL)
+	{
 		ExecRestrPos(outerPlan);
+
+		/* Restoring position in turn restores run state */
+		SetNodeRunState(node, Running);
+	}
 	else
 		elog(ERROR, "Result nodes do not support mark/restore");
 }
@@ -295,6 +306,7 @@ ExecEndResult(ResultState *node)
 void
 ExecReScanResult(ResultState *node)
 {
+	SetNodeRunState(node, Inited);
 	node->rs_done = false;
 	node->ps.ps_TupFromTlist = false;
 	node->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
diff --git a/src/backend/executor/nodeSamplescan.c b/src/backend/executor/nodeSamplescan.c
index 0f75110..f46569e 100644
--- a/src/backend/executor/nodeSamplescan.c
+++ b/src/backend/executor/nodeSamplescan.c
@@ -98,9 +98,17 @@ SampleRecheck(SampleScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecSampleScan(SampleScanState *node)
 {
-	return ExecScan((ScanState *) node,
+	TupleTableSlot *slot;
+
+	SetNodeRunState(node, Running);
+	slot = ExecScan((ScanState *) node,
 					(ExecScanAccessMtd) SampleNext,
 					(ExecScanRecheckMtd) SampleRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 0ee33ed..d443c09 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -124,9 +124,17 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecSeqScan(SeqScanState *node)
 {
-	return ExecScan((ScanState *) node,
+	TupleTableSlot *slot;
+
+	SetNodeRunState(node, Running);
+	slot = ExecScan((ScanState *) node,
 					(ExecScanAccessMtd) SeqNext,
 					(ExecScanRecheckMtd) SeqRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 /* ----------------------------------------------------------------
@@ -275,6 +283,7 @@ ExecReScanSeqScan(SeqScanState *node)
 {
 	HeapScanDesc scan;
 
+	SetNodeRunState(node, Inited);
 	scan = node->ss.ss_currentScanDesc;
 
 	if (scan != NULL)
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 123d051..c248ff3 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -603,6 +603,7 @@ ExecEndSetOp(SetOpState *node)
 void
 ExecReScanSetOp(SetOpState *node)
 {
+	SetNodeRunState(node, Inited);
 	ExecClearTuple(node->ps.ps_ResultTupleSlot);
 	node->numOutput = 0;
 
@@ -653,6 +654,4 @@ ExecReScanSetOp(SetOpState *node)
 	 */
 	if (node->ps.lefttree->chgParam == NULL)
 		ExecReScan(node->ps.lefttree);
-
-	SetNodeRunState(node, Inited);
 }
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 3ae5b89..a2abec7 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -49,6 +49,8 @@ ExecSort(SortState *node)
 	SO1_printf("ExecSort: %s\n",
 			   "entering routine");
 
+	SetNodeRunState(node, Running);
+
 	estate = node->ss.ps.state;
 	dir = estate->es_direction;
 	tuplesortstate = (Tuplesortstate *) node->tuplesortstate;
@@ -138,6 +140,10 @@ ExecSort(SortState *node)
 	(void) tuplesort_gettupleslot(tuplesortstate,
 								  ScanDirectionIsForward(dir),
 								  slot);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
 	return slot;
 }
 
@@ -282,6 +288,9 @@ ExecSortRestrPos(SortState *node)
 	if (!node->sort_Done)
 		return;
 
+	/* Restoring position in turn restores run state */
+	SetNodeRunState(node, Running);
+
 	/*
 	 * restore the scan to the previously marked position
 	 */
@@ -293,6 +302,8 @@ ExecReScanSort(SortState *node)
 {
 	PlanState  *outerPlan = outerPlanState(node);
 
+	SetNodeRunState(node, Inited);
+
 	/*
 	 * If we haven't sorted yet, just return. If outerplan's chgParam is not
 	 * NULL then it will be re-scanned by ExecProcNode, else no reason to
diff --git a/src/backend/executor/nodeSubqueryscan.c b/src/backend/executor/nodeSubqueryscan.c
index 497d6df..d8799d1 100644
--- a/src/backend/executor/nodeSubqueryscan.c
+++ b/src/backend/executor/nodeSubqueryscan.c
@@ -90,9 +90,17 @@ SubqueryRecheck(SubqueryScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecSubqueryScan(SubqueryScanState *node)
 {
-	return ExecScan(&node->ss,
+	TupleTableSlot *slot;
+
+	SetNodeRunState(node, Running);
+	slot = ExecScan(&node->ss,
 					(ExecScanAccessMtd) SubqueryNext,
 					(ExecScanRecheckMtd) SubqueryRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 /* ----------------------------------------------------------------
@@ -199,6 +207,7 @@ ExecEndSubqueryScan(SubqueryScanState *node)
 void
 ExecReScanSubqueryScan(SubqueryScanState *node)
 {
+	SetNodeRunState(node, Inited);
 	ExecScanReScan(&node->ss);
 
 	/*
diff --git a/src/backend/executor/nodeTidscan.c b/src/backend/executor/nodeTidscan.c
index f19e735..724016d 100644
--- a/src/backend/executor/nodeTidscan.c
+++ b/src/backend/executor/nodeTidscan.c
@@ -390,9 +390,17 @@ TidRecheck(TidScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecTidScan(TidScanState *node)
 {
-	return ExecScan(&node->ss,
+	TupleTableSlot *slot;
+
+	SetNodeRunState(node, Running);
+	slot = ExecScan(&node->ss,
 					(ExecScanAccessMtd) TidNext,
 					(ExecScanRecheckMtd) TidRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 /* ----------------------------------------------------------------
@@ -402,6 +410,8 @@ ExecTidScan(TidScanState *node)
 void
 ExecReScanTidScan(TidScanState *node)
 {
+	SetNodeRunState(node, Inited);
+
 	if (node->tss_TidList)
 		pfree(node->tss_TidList);
 	node->tss_TidList = NULL;
diff --git a/src/backend/executor/nodeUnique.c b/src/backend/executor/nodeUnique.c
index f259f32..1f7ca10 100644
--- a/src/backend/executor/nodeUnique.c
+++ b/src/backend/executor/nodeUnique.c
@@ -50,6 +50,8 @@ ExecUnique(UniqueState *node)
 	TupleTableSlot *slot;
 	PlanState  *outerPlan;
 
+	SetNodeRunState(node, Running);
+
 	/*
 	 * get information from the node
 	 */
@@ -71,6 +73,7 @@ ExecUnique(UniqueState *node)
 		{
 			/* end of subplan, so we're done */
 			ExecClearTuple(resultTupleSlot);
+			SetNodeRunState(node, Done);
 			return NULL;
 		}
 
@@ -187,6 +190,8 @@ ExecEndUnique(UniqueState *node)
 void
 ExecReScanUnique(UniqueState *node)
 {
+	SetNodeRunState(node, Inited);
+
 	/* must clear result tuple so first input tuple is returned */
 	ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
diff --git a/src/backend/executor/nodeValuesscan.c b/src/backend/executor/nodeValuesscan.c
index c56199c..48d1ad8 100644
--- a/src/backend/executor/nodeValuesscan.c
+++ b/src/backend/executor/nodeValuesscan.c
@@ -175,9 +175,18 @@ ValuesRecheck(ValuesScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecValuesScan(ValuesScanState *node)
 {
-	return ExecScan(&node->ss,
+	TupleTableSlot *slot;
+
+	/* Advance the state to running if just after initialized */
+	SetNodeRunState(node, Running);
+	slot = ExecScan(&node->ss,
 					(ExecScanAccessMtd) ValuesNext,
 					(ExecScanRecheckMtd) ValuesRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 /* ----------------------------------------------------------------
@@ -302,6 +311,7 @@ ExecEndValuesScan(ValuesScanState *node)
 void
 ExecReScanValuesScan(ValuesScanState *node)
 {
+	SetNodeRunState(node, Inited);
 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
 
 	ExecScanReScan(&node->ss);
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 8734e8e..0d127b4 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -2064,6 +2064,7 @@ ExecReScanWindowAgg(WindowAggState *node)
 	PlanState  *outerPlan = outerPlanState(node);
 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
 
+	SetNodeRunState(node, Inited);
 	node->ss.ps.ps_TupFromTlist = false;
 	node->all_first = true;
 
@@ -2087,8 +2088,6 @@ ExecReScanWindowAgg(WindowAggState *node)
 	 */
 	if (outerPlan->chgParam == NULL)
 		ExecReScan(outerPlan);
-
-	SetNodeRunState(node, Inited);
 }
 
 /*
diff --git a/src/backend/executor/nodeWorktablescan.c b/src/backend/executor/nodeWorktablescan.c
index 799e96b..46350ab 100644
--- a/src/backend/executor/nodeWorktablescan.c
+++ b/src/backend/executor/nodeWorktablescan.c
@@ -80,6 +80,10 @@ WorkTableScanRecheck(WorkTableScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecWorkTableScan(WorkTableScanState *node)
 {
+	TupleTableSlot *slot;
+
+	SetNodeRunState(node, Running);
+
 	/*
 	 * On the first call, find the ancestor RecursiveUnion's state via the
 	 * Param slot reserved for it.  (We can't do this during node init because
@@ -114,9 +118,14 @@ ExecWorkTableScan(WorkTableScanState *node)
 		ExecAssignScanProjectionInfo(&node->ss);
 	}
 
-	return ExecScan(&node->ss,
+	slot = ExecScan(&node->ss,
 					(ExecScanAccessMtd) WorkTableScanNext,
 					(ExecScanRecheckMtd) WorkTableScanRecheck);
+
+	if (TupIsNull(slot))
+		SetNodeRunState(node, Done);
+
+	return slot;
 }
 
 
@@ -210,6 +219,7 @@ ExecEndWorkTableScan(WorkTableScanState *node)
 void
 ExecReScanWorkTableScan(WorkTableScanState *node)
 {
+	SetNodeRunState(node, Inited);
 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
 
 	ExecScanReScan(&node->ss);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4527a98..ee706fc 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -355,14 +355,16 @@ typedef enum ExecNodeRunState
 	ERunState_Done			/* No tuple to return  */
 } ExecNodeRunState;
 
-#define SetNodeRunState(nd,st) (((PlanState*)nd)->runstate = (ERunState_##st))
+#define SetNodeRunState(nd,st) (((PlanState*)(nd))->runstate = (ERunState_##st))
+#define ExecNode_is(nd,st) (((PlanState*)(nd))->runstate == (ERunState_##st))
+#define ExecNode_is_inited(nd)	(ExecNode_is((nd),Inited))
+#define ExecNode_is_running(nd)	(ExecNode_is((nd),Running))
+#define ExecNode_is_done(nd) (ExecNode_is((nd),Done))
 #define AdvanceNodeRunStateTo(nd,st) \
 	do {\
-		if (((PlanState*)nd)->runstate < (ERunState_##st))\
-			((PlanState*)nd)->runstate = (ERunState_##st);\
+		if (((PlanState*)(nd))->runstate < (ERunState_##st))\
+			((PlanState*)(nd))->runstate = (ERunState_##st);\
 	} while(0);
-#define ExecNode_is_running(nd)	(((PlanState*)nd)->runstate == ERunState_Running)
-#define ExecNode_is_done(nd)	(((PlanState*)nd)->runstate == ERunState_Done)
 
 /* ----------------
  *	  EState information
-- 
1.8.3.1

>From bee3f8a971a1c86a7c53929a17e358a763348193 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 9 Jul 2015 19:34:15 +0900
Subject: [PATCH 3/5] Add a feature to start node asynchronously

Add a feature to start child nodes asynchronously in join nodes and
Append/MergeAppend nodes.  At this point, no node can be started
asynchronously so the behavior is not changed.
---
 src/backend/executor/execProcnode.c     | 106 ++++++++++++++++++++++++
 src/backend/executor/nodeAgg.c          |  22 +++++
 src/backend/executor/nodeAppend.c       |  29 +++++++
 src/backend/executor/nodeCtescan.c      |  21 +++++
 src/backend/executor/nodeGather.c       | 141 +++++++++++++++++++-------------
 src/backend/executor/nodeGroup.c        |  22 +++++
 src/backend/executor/nodeHash.c         |  22 +++++
 src/backend/executor/nodeHashjoin.c     |  54 ++++++++++++
 src/backend/executor/nodeLimit.c        |  22 +++++
 src/backend/executor/nodeLockRows.c     |  22 +++++
 src/backend/executor/nodeMaterial.c     |  23 ++++++
 src/backend/executor/nodeMergeAppend.c  |  30 +++++++
 src/backend/executor/nodeMergejoin.c    |  29 +++++++
 src/backend/executor/nodeNestloop.c     |  34 ++++++++
 src/backend/executor/nodeResult.c       |  24 ++++++
 src/backend/executor/nodeSetOp.c        |  22 +++++
 src/backend/executor/nodeSort.c         |  22 +++++
 src/backend/executor/nodeSubqueryscan.c |  22 +++++
 src/backend/executor/nodeUnique.c       |  22 +++++
 src/backend/executor/nodeWindowAgg.c    |  22 +++++
 src/include/executor/executor.h         |   1 +
 src/include/executor/nodeAgg.h          |   1 +
 src/include/executor/nodeAppend.h       |   1 +
 src/include/executor/nodeCtescan.h      |   1 +
 src/include/executor/nodeGather.h       |   1 +
 src/include/executor/nodeGroup.h        |   1 +
 src/include/executor/nodeHash.h         |   1 +
 src/include/executor/nodeHashjoin.h     |   1 +
 src/include/executor/nodeLimit.h        |   1 +
 src/include/executor/nodeLockRows.h     |   1 +
 src/include/executor/nodeMaterial.h     |   1 +
 src/include/executor/nodeMergeAppend.h  |   1 +
 src/include/executor/nodeMergejoin.h    |   1 +
 src/include/executor/nodeNestloop.h     |   1 +
 src/include/executor/nodeResult.h       |   1 +
 src/include/executor/nodeSetOp.h        |   1 +
 src/include/executor/nodeSort.h         |   1 +
 src/include/executor/nodeSubqueryscan.h |   1 +
 src/include/executor/nodeUnique.h       |   1 +
 src/include/executor/nodeWindowAgg.h    |   1 +
 40 files changed, 672 insertions(+), 59 deletions(-)

diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 6f5c554..a9d973d 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -786,6 +786,112 @@ ExecEndNode(PlanState *node)
 }
 
 /*
+ * StartProcNode - asynchronously execnode nodes underneath if possible
+ *
+ * Returns true if the node has been started asynchronously. Some of the nodes
+ * may be started even if false.
+ */
+bool
+StartProcNode(PlanState *node)
+{
+	/*
+	 * Refuse duplicate start. This occurs for skipped children on rescan on
+	 * nodes such like MergeAppend.
+	 */
+	if (node->runstate > ERunState_Started)
+		return false;
+
+	switch (nodeTag(node))
+	{
+	case T_ResultState:
+		return StartResult((ResultState *)node);
+
+	case T_AppendState:
+		return StartAppend((AppendState *)node);
+
+	case T_MergeAppendState:
+		return StartMergeAppend((MergeAppendState *)node);
+
+	case T_SubqueryScanState:
+		return StartSubqueryScan((SubqueryScanState *)node);
+
+	case T_CteScanState:
+		return StartCteScan((CteScanState *)node);
+
+		/*
+		 * join nodes
+		 */
+	case T_NestLoopState:
+		return StartNestLoop((NestLoopState *)node);
+
+	case T_MergeJoinState:
+		return StartMergeJoin((MergeJoinState *)node);
+
+	case T_HashJoinState:
+		return StartHashJoin((HashJoinState *)node);
+
+		/*
+		 * materialization nodes
+		 */
+	case T_MaterialState:
+		return StartMaterial((MaterialState *)node);
+
+	case T_SortState:
+		return StartSort((SortState *)node);
+
+	case T_GroupState:
+		return StartGroup((GroupState *)node);
+
+	case T_AggState:
+		return StartAgg((AggState *)node);
+
+	case T_WindowAggState:
+		return StartWindowAgg((WindowAggState *)node);
+
+	case T_UniqueState:
+		return StartUnique((UniqueState *)node);
+
+	case T_HashState:
+		return StartHash((HashState *)node);
+
+	case T_SetOpState:
+		return StartSetOp((SetOpState *)node);
+
+	case T_LockRowsState:
+		return StartLockRows((LockRowsState *)node);
+
+	case T_LimitState:
+		return StartLimit((LimitState *)node);
+
+	case T_GatherState:
+		return StartGather((GatherState *)node);
+
+	/* These nodes cannot run asynchronously */
+	case T_ForeignScanState:
+	case T_WorkTableScanState:
+	case T_CustomScanState:
+	case T_FunctionScanState:
+	case T_ValuesScanState:
+	case T_SeqScanState:
+	case T_SampleScanState:
+	case T_IndexScanState:
+	case T_IndexOnlyScanState:
+	case T_BitmapIndexScanState:
+	case T_BitmapHeapScanState:
+	case T_TidScanState:
+	case T_ModifyTableState:
+	case T_RecursiveUnionState:
+	case T_BitmapAndState:
+	case T_BitmapOrState:
+		return false;
+
+	default:
+		elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
+		break;
+	}
+}
+
+/*
  * ExecShutdownNode
  *
  * Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index ed29e3a..a40cb48 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1569,6 +1569,28 @@ ExecAgg(AggState *node)
 }
 
 /*
+ * StartAgg - Try asynchronous execution of this node
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ */
+bool
+StartAgg(AggState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+
+/*
  * ExecAgg for non-hashed case
  */
 static TupleTableSlot *
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 03b3b66..2b918b2 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -194,6 +194,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecAppend(AppendState *node)
 {
+	/* start child nodes asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartAppend(node);
+
 	SetNodeRunState(node, Running);
 
 	for (;;)
@@ -241,6 +245,31 @@ ExecAppend(AppendState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartAppend
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartAppend(AppendState *node)
+{
+	int i;
+	bool async = false;
+
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	for (i = 0 ; i < node->as_nplans ; i++)
+		async |= StartProcNode(node->appendplans[i]);
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+/* ----------------------------------------------------------------
  *		ExecEndAppend
  *
  *		Shuts down the subscans of the append node.
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index d237370..cae0aca 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -166,6 +166,27 @@ ExecCteScan(CteScanState *node)
 	return slot;
 }
 
+/* ----------------------------------------------------------------
+ *		StartCteScan
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartCteScan(CteScanState *node)
+{
+	if (ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(node->cteplanstate))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
 
 /* ----------------------------------------------------------------
  *		ExecInitCteScan
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index aceb358..5364acb 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -69,6 +69,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate->ps.state = estate;
 	SetNodeRunState(gatherstate, Inited);
 	gatherstate->need_to_scan_locally = !node->single_copy;
+	SetNodeRunState(gatherstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
@@ -119,23 +120,24 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 }
 
 /* ----------------------------------------------------------------
- *		ExecGather(node)
+ *		StartGather
  *
- *		Scans the relation via multiple workers and returns
- *		the next qualifying tuple.
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
  * ----------------------------------------------------------------
  */
-TupleTableSlot *
-ExecGather(GatherState *node)
+bool
+StartGather(GatherState *node)
 {
+	EState	   *estate = node->ps.state;
+	Gather	   *gather = (Gather *) node->ps.plan;
 	TupleTableSlot *fslot = node->funnel_slot;
-	int			i;
-	TupleTableSlot *slot;
-	TupleTableSlot *resultSlot;
-	ExprDoneCond isDone;
-	ExprContext *econtext;
+	int i;
 
-	SetNodeRunState(node, Running);
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	SetNodeRunState(node, Started);
 
 	/*
 	 * Initialize the parallel context and workers on first execution. We do
@@ -143,66 +145,85 @@ ExecGather(GatherState *node)
 	 * needs to allocate large dynamic segement, so it is better to do if it
 	 * is really needed.
 	 */
-	if (ExecNode_is_inited(node))
+
+	/*
+	 * Sometimes we might have to run without parallelism; but if
+	 * parallel mode is active then we can try to fire up some workers.
+	 */
+	if (gather->num_workers > 0 && IsInParallelMode())
 	{
-		EState	   *estate = node->ps.state;
-		Gather	   *gather = (Gather *) node->ps.plan;
+		ParallelContext *pcxt;
+		bool	got_any_worker = false;
+
+		/* Initialize the workers required to execute Gather node. */
+		if (!node->pei)
+			node->pei = ExecInitParallelPlan(node->ps.lefttree,
+											 estate,
+											 gather->num_workers);
 
 		/*
-		 * Sometimes we might have to run without parallelism; but if
-		 * parallel mode is active then we can try to fire up some workers.
+		 * Register backend workers. We might not get as many as we
+		 * requested, or indeed any at all.
 		 */
-		if (gather->num_workers > 0 && IsInParallelMode())
-		{
-			ParallelContext *pcxt;
-			bool	got_any_worker = false;
+		pcxt = node->pei->pcxt;
+		LaunchParallelWorkers(pcxt);
 
-			/* Initialize the workers required to execute Gather node. */
-			if (!node->pei)
-				node->pei = ExecInitParallelPlan(node->ps.lefttree,
-												 estate,
-												 gather->num_workers);
-
-			/*
-			 * Register backend workers. We might not get as many as we
-			 * requested, or indeed any at all.
-			 */
-			pcxt = node->pei->pcxt;
-			LaunchParallelWorkers(pcxt);
+		/* Set up tuple queue readers to read the results. */
+		if (pcxt->nworkers > 0)
+		{
+			node->nreaders = 0;
+			node->reader =
+				palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
 
-			/* Set up tuple queue readers to read the results. */
-			if (pcxt->nworkers > 0)
+			for (i = 0; i < pcxt->nworkers; ++i)
 			{
-				node->nreaders = 0;
-				node->reader =
-					palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
-
-				for (i = 0; i < pcxt->nworkers; ++i)
-				{
-					if (pcxt->worker[i].bgwhandle == NULL)
-						continue;
-
-					shm_mq_set_handle(node->pei->tqueue[i],
-									  pcxt->worker[i].bgwhandle);
-					node->reader[node->nreaders++] =
-						CreateTupleQueueReader(node->pei->tqueue[i],
-											   fslot->tts_tupleDescriptor);
-					got_any_worker = true;
-				}
+				if (pcxt->worker[i].bgwhandle == NULL)
+					continue;
+
+				shm_mq_set_handle(node->pei->tqueue[i],
+								  pcxt->worker[i].bgwhandle);
+				node->reader[node->nreaders++] =
+					CreateTupleQueueReader(node->pei->tqueue[i],
+										   fslot->tts_tupleDescriptor);
+				got_any_worker = true;
 			}
-
-			/* No workers?  Then never mind. */
-			if (!got_any_worker)
-				ExecShutdownGatherWorkers(node);
 		}
 
-		/* Run plan locally if no workers or not single-copy. */
-		node->need_to_scan_locally = (node->reader == NULL)
-			|| !gather->single_copy;
-
-		SetNodeRunState(node, Running);
+		/* No workers?  Then never mind. */
+		if (!got_any_worker)
+			ExecShutdownGatherWorkers(node);
 	}
 
+	/* Run plan locally if no workers or not single-copy. */
+	node->need_to_scan_locally = (node->reader == NULL)
+		|| !gather->single_copy;
+
+	/* Plans on worker are always executed asynchronously */
+	SetNodeRunState(node, Started);
+	return true;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecGather(node)
+ *
+ *		Scans the relation via multiple workers and returns
+ *		the next qualifying tuple.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecGather(GatherState *node)
+{
+	TupleTableSlot *slot;
+	TupleTableSlot *resultSlot;
+	ExprDoneCond isDone;
+	ExprContext *econtext;
+
+	/* Execute childs asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartGather(node);
+
+	SetNodeRunState(node, Running);
+
 	/*
 	 * Check to see if we're still projecting out tuples from a previous scan
 	 * tuple (because there is a function-returning-set in the projection
@@ -462,6 +483,8 @@ ExecReScanGather(GatherState *node)
 	 */
 	ExecShutdownGatherWorkers(node);
 
+	SetNodeRunState(node, Inited);
+
 	if (node->pei)
 		ExecParallelReinitialize(node->pei);
 
diff --git a/src/backend/executor/nodeGroup.c b/src/backend/executor/nodeGroup.c
index a593d9f..ea947b9 100644
--- a/src/backend/executor/nodeGroup.c
+++ b/src/backend/executor/nodeGroup.c
@@ -190,6 +190,28 @@ ExecGroup(GroupState *node)
 }
 
 /* -----------------
+ * StartGroup
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * -----------------
+ */
+bool
+StartGroup(GroupState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* -----------------
  * ExecInitGroup
  *
  *	Creates the run-time information for the group node produced by the
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 5a71fe3..e04a78f 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -157,6 +157,28 @@ MultiExecHash(HashState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		AsyncStartHash
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartHash(HashState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitHash
  *
  *		Init routine for Hash node
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index dbaabc4..ada9290 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -72,6 +72,10 @@ ExecHashJoin(HashJoinState *node)
 	uint32		hashvalue;
 	int			batchno;
 
+	/* Try to start asynchronously */
+	if (ExecNode_is_inited(node))
+		StartHashJoin(node);
+
 	SetNodeRunState(node, Running);
 
 	/*
@@ -435,6 +439,56 @@ ExecHashJoin(HashJoinState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartHashJoin
+ *
+ * This function behaves a bit different from StartNode functions of other
+ * nodes from the behavior of ExecHashJoin.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartHashJoin(HashJoinState *node)
+{
+	PlanState  *outerNode = outerPlanState(node);
+	HashState  *hashNode = (HashState *) innerPlanState(node);
+	bool 		async;
+
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	async = StartProcNode(outerNode);
+
+	/*
+	 * This condition is the same to that to check the necessity of inner hash
+	 * at HJ_BUILD_HASHTABLE of ExecHashJoin.
+	 */
+	if (!HJ_FILL_INNER(node) &&
+		(HJ_FILL_OUTER(node) ||
+		 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
+		  !node->hj_OuterNotEmpty)))
+	{
+		/*
+		 * The first tuple of outer plan is needed to judge the necessity of
+		 * inner hash here so don't start inner plan. Although the condition
+		 * to come here is dependent on the costs of outer startup and hash
+		 * creation and asynchronous execution will break this balance, we
+		 * continue to depend on this formula for now, because of the lack of
+		 * appropriate alternative.
+		 */
+	}
+	else
+	{
+		/* Hash will be created. Start the inner node. */
+		async |= StartProcNode((PlanState *)hashNode);
+	}
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitHashJoin
  *
  *		Init routine for HashJoin node.
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index e59d71f..21b2c37 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -243,6 +243,28 @@ ExecLimit(LimitState *node)
 	return slot;
 }
 
+/* ----------------------------------------------------------------
+ *		StartLimit
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartLimit(LimitState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * Evaluate the limit/offset expressions --- done at startup or rescan.
  *
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index 2ccf05d..e741a93 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -347,6 +347,28 @@ lnext:
 }
 
 /* ----------------------------------------------------------------
+ *		StartLockRows
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartLockRows(LockRowsState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitLockRows
  *
  *		This initializes the LockRows node state structures and
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 981398a..4e41c1c 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -160,6 +160,28 @@ ExecMaterial(MaterialState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartMaterial
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartMaterial(MaterialState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitMaterial
  * ----------------------------------------------------------------
  */
@@ -330,6 +352,7 @@ ExecReScanMaterial(MaterialState *node)
 	SetNodeRunState(node, Inited);
 
 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+	SetNodeRunState(node, Inited);
 
 	if (node->eflags != 0)
 	{
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 4678d7c..ab6c304 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -170,6 +170,10 @@ ExecMergeAppend(MergeAppendState *node)
 	TupleTableSlot *result;
 	SlotNumber	i;
 
+	/* start child nodes asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartMergeAppend(node);
+	
 	SetNodeRunState(node, Running);
 
 	if (!node->ms_initialized)
@@ -220,6 +224,32 @@ ExecMergeAppend(MergeAppendState *node)
 	return result;
 }
 
+/* ----------------------------------------------------------------
+ *		StartMergeAppend
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartMergeAppend(MergeAppendState *node)
+{
+	int i;
+	bool async = false;
+
+	if (ExecNode_is_inited(node))
+		return false;
+
+	for (i = 0 ; i < node->ms_nplans ; i++)
+		async |= StartProcNode(node->mergeplans[i]);
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+
 /*
  * Compare the tuples in the two given slots.
  */
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 74ceaa2..32bd8a5 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -630,6 +630,10 @@ ExecMergeJoin(MergeJoinState *node)
 	bool		doFillOuter;
 	bool		doFillInner;
 
+	/* Execute childs asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartMergeJoin(node);
+
 	SetNodeRunState(node, Running);
 
 	/*
@@ -1475,6 +1479,31 @@ ExecMergeJoin(MergeJoinState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		AsyncStartMergeJoin
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartMergeJoin(MergeJoinState *node)
+{
+	bool async;
+
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	/* Merge join can unconditionally start child nodes asynchronously */
+	async  = StartProcNode(innerPlanState(node));
+	async |= StartProcNode(outerPlanState(node));
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitMergeJoin
  * ----------------------------------------------------------------
  */
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index ae69176..cb95a3d 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -69,6 +69,10 @@ ExecNestLoop(NestLoopState *node)
 	ExprContext *econtext;
 	ListCell   *lc;
 
+	/* Execute childs asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartNestLoop(node);
+
 	SetNodeRunState(node, Running);
 
 	/*
@@ -292,6 +296,36 @@ ExecNestLoop(NestLoopState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartNestLoop
+ *
+ * The inner plan of nest loop won't be executed asynchronously if it is
+ * parameterized.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartNestLoop(NestLoopState *node)
+{
+	NestLoop   *nl = (NestLoop *) node->js.ps.plan;
+	bool async;
+
+	if (!ExecNode_is_inited(node))
+		return true;
+
+	/* Always try async execution of outer plan  */
+	async = StartProcNode(outerPlanState(node));
+
+	/* This inner node cannot be asynchronous if it is parameterized */
+	if (list_length(nl->nestParams) < 1)
+		async |= StartProcNode(innerPlanState(node));
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitNestLoop
  * ----------------------------------------------------------------
  */
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index ec81eda..c33443a 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -171,6 +171,30 @@ ExecResult(ResultState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartResult
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartResult(ResultState * node)
+{
+	PlanState *subnode = outerPlanState(node);
+
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (subnode && StartProcNode(subnode))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecResultMarkPos
  * ----------------------------------------------------------------
  */
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index c248ff3..a0dbec6 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -225,6 +225,28 @@ ExecSetOp(SetOpState *node)
 		return setop_retrieve_direct(node);
 }
 
+/* ----------------------------------------------------------------
+ *		StartSetOp
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartSetOp(SetOpState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * ExecSetOp for non-hashed case
  */
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index a2abec7..f0c9a63 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -148,6 +148,28 @@ ExecSort(SortState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartSort
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartSort(SortState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitSort
  *
  *		Creates the run-time state information for the sort node
diff --git a/src/backend/executor/nodeSubqueryscan.c b/src/backend/executor/nodeSubqueryscan.c
index d8799d1..4899d93 100644
--- a/src/backend/executor/nodeSubqueryscan.c
+++ b/src/backend/executor/nodeSubqueryscan.c
@@ -104,6 +104,28 @@ ExecSubqueryScan(SubqueryScanState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartSubqueryScan
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartSubqueryScan(SubqueryScanState *node)
+{
+	if (ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(node->subplan))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitSubqueryScan
  * ----------------------------------------------------------------
  */
diff --git a/src/backend/executor/nodeUnique.c b/src/backend/executor/nodeUnique.c
index 1f7ca10..53da967 100644
--- a/src/backend/executor/nodeUnique.c
+++ b/src/backend/executor/nodeUnique.c
@@ -105,6 +105,28 @@ ExecUnique(UniqueState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartUnique
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartUnique(UniqueState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitUnique
  *
  *		This initializes the unique node state structures and
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 0d127b4..6327d55 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -1760,6 +1760,28 @@ restart:
 }
 
 /* -----------------
+ * StartWindowAgg
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * -----------------
+ */
+bool
+StartWindowAgg(WindowAggState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* -----------------
  * ExecInitWindowAgg
  *
  *	Creates the run-time information for the WindowAgg node produced by the
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 4f77692..230c9af 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -223,6 +223,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
  */
 extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
+extern bool StartProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index fe3b81a..7fb0a6f 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -18,6 +18,7 @@
 
 extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecAgg(AggState *node);
+extern bool StartAgg(AggState *node);
 extern void ExecEndAgg(AggState *node);
 extern void ExecReScanAgg(AggState *node);
 
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index f2d920b..d77b70e 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -18,6 +18,7 @@
 
 extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecAppend(AppendState *node);
+extern bool StartAppend(AppendState *node);
 extern void ExecEndAppend(AppendState *node);
 extern void ExecReScanAppend(AppendState *node);
 
diff --git a/src/include/executor/nodeCtescan.h b/src/include/executor/nodeCtescan.h
index 369dafa..e418786 100644
--- a/src/include/executor/nodeCtescan.h
+++ b/src/include/executor/nodeCtescan.h
@@ -18,6 +18,7 @@
 
 extern CteScanState *ExecInitCteScan(CteScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecCteScan(CteScanState *node);
+extern bool StartCteScan(CteScanState *node);
 extern void ExecEndCteScan(CteScanState *node);
 extern void ExecReScanCteScan(CteScanState *node);
 
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index 9e5d8fc..e7cbe21 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -17,6 +17,7 @@
 #include "nodes/execnodes.h"
 
 extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);
+extern bool StartGather(GatherState *node);
 extern TupleTableSlot *ExecGather(GatherState *node);
 extern void ExecEndGather(GatherState *node);
 extern void ExecShutdownGather(GatherState *node);
diff --git a/src/include/executor/nodeGroup.h b/src/include/executor/nodeGroup.h
index 3485fe8..bfc75cd 100644
--- a/src/include/executor/nodeGroup.h
+++ b/src/include/executor/nodeGroup.h
@@ -18,6 +18,7 @@
 
 extern GroupState *ExecInitGroup(Group *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecGroup(GroupState *node);
+extern bool StartGroup(GroupState *node);
 extern void ExecEndGroup(GroupState *node);
 extern void ExecReScanGroup(GroupState *node);
 
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index acc28438..b0855d3 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -19,6 +19,7 @@
 extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecHash(HashState *node);
 extern Node *MultiExecHash(HashState *node);
+extern bool StartHash(HashState *node);
 extern void ExecEndHash(HashState *node);
 extern void ExecReScanHash(HashState *node);
 
diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h
index c35a51c..826f639 100644
--- a/src/include/executor/nodeHashjoin.h
+++ b/src/include/executor/nodeHashjoin.h
@@ -19,6 +19,7 @@
 
 extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecHashJoin(HashJoinState *node);
+extern bool StartHashJoin(HashJoinState *node);
 extern void ExecEndHashJoin(HashJoinState *node);
 extern void ExecReScanHashJoin(HashJoinState *node);
 
diff --git a/src/include/executor/nodeLimit.h b/src/include/executor/nodeLimit.h
index 44f2936..5e8d2ea 100644
--- a/src/include/executor/nodeLimit.h
+++ b/src/include/executor/nodeLimit.h
@@ -18,6 +18,7 @@
 
 extern LimitState *ExecInitLimit(Limit *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecLimit(LimitState *node);
+extern bool StartLimit(LimitState *node);
 extern void ExecEndLimit(LimitState *node);
 extern void ExecReScanLimit(LimitState *node);
 
diff --git a/src/include/executor/nodeLockRows.h b/src/include/executor/nodeLockRows.h
index 41764a1..c450233 100644
--- a/src/include/executor/nodeLockRows.h
+++ b/src/include/executor/nodeLockRows.h
@@ -18,6 +18,7 @@
 
 extern LockRowsState *ExecInitLockRows(LockRows *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecLockRows(LockRowsState *node);
+extern bool StartLockRows(LockRowsState *node);
 extern void ExecEndLockRows(LockRowsState *node);
 extern void ExecReScanLockRows(LockRowsState *node);
 
diff --git a/src/include/executor/nodeMaterial.h b/src/include/executor/nodeMaterial.h
index cfb7a13..0392d29 100644
--- a/src/include/executor/nodeMaterial.h
+++ b/src/include/executor/nodeMaterial.h
@@ -18,6 +18,7 @@
 
 extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecMaterial(MaterialState *node);
+extern bool StartMaterial(MaterialState *node);
 extern void ExecEndMaterial(MaterialState *node);
 extern void ExecMaterialMarkPos(MaterialState *node);
 extern void ExecMaterialRestrPos(MaterialState *node);
diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h
index 3c5068c..2f637dc 100644
--- a/src/include/executor/nodeMergeAppend.h
+++ b/src/include/executor/nodeMergeAppend.h
@@ -18,6 +18,7 @@
 
 extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecMergeAppend(MergeAppendState *node);
+extern bool StartMergeAppend(MergeAppendState *node);
 extern void ExecEndMergeAppend(MergeAppendState *node);
 extern void ExecReScanMergeAppend(MergeAppendState *node);
 
diff --git a/src/include/executor/nodeMergejoin.h b/src/include/executor/nodeMergejoin.h
index bee5367..ead6898 100644
--- a/src/include/executor/nodeMergejoin.h
+++ b/src/include/executor/nodeMergejoin.h
@@ -18,6 +18,7 @@
 
 extern MergeJoinState *ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecMergeJoin(MergeJoinState *node);
+extern bool StartMergeJoin(MergeJoinState *node);
 extern void ExecEndMergeJoin(MergeJoinState *node);
 extern void ExecReScanMergeJoin(MergeJoinState *node);
 
diff --git a/src/include/executor/nodeNestloop.h b/src/include/executor/nodeNestloop.h
index ff0720f..f79a002 100644
--- a/src/include/executor/nodeNestloop.h
+++ b/src/include/executor/nodeNestloop.h
@@ -18,6 +18,7 @@
 
 extern NestLoopState *ExecInitNestLoop(NestLoop *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecNestLoop(NestLoopState *node);
+extern bool StartNestLoop(NestLoopState *node);
 extern void ExecEndNestLoop(NestLoopState *node);
 extern void ExecReScanNestLoop(NestLoopState *node);
 
diff --git a/src/include/executor/nodeResult.h b/src/include/executor/nodeResult.h
index 17a7bb6..84b375d 100644
--- a/src/include/executor/nodeResult.h
+++ b/src/include/executor/nodeResult.h
@@ -18,6 +18,7 @@
 
 extern ResultState *ExecInitResult(Result *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecResult(ResultState *node);
+extern bool StartResult(ResultState *node);
 extern void ExecEndResult(ResultState *node);
 extern void ExecResultMarkPos(ResultState *node);
 extern void ExecResultRestrPos(ResultState *node);
diff --git a/src/include/executor/nodeSetOp.h b/src/include/executor/nodeSetOp.h
index ed6c96a..f960dda 100644
--- a/src/include/executor/nodeSetOp.h
+++ b/src/include/executor/nodeSetOp.h
@@ -18,6 +18,7 @@
 
 extern SetOpState *ExecInitSetOp(SetOp *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecSetOp(SetOpState *node);
+extern bool StartSetOp(SetOpState *node);
 extern void ExecEndSetOp(SetOpState *node);
 extern void ExecReScanSetOp(SetOpState *node);
 
diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h
index 20d909b..0c6d12d 100644
--- a/src/include/executor/nodeSort.h
+++ b/src/include/executor/nodeSort.h
@@ -18,6 +18,7 @@
 
 extern SortState *ExecInitSort(Sort *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecSort(SortState *node);
+extern bool StartSort(SortState *node);
 extern void ExecEndSort(SortState *node);
 extern void ExecSortMarkPos(SortState *node);
 extern void ExecSortRestrPos(SortState *node);
diff --git a/src/include/executor/nodeSubqueryscan.h b/src/include/executor/nodeSubqueryscan.h
index 56e3aec..0301edd 100644
--- a/src/include/executor/nodeSubqueryscan.h
+++ b/src/include/executor/nodeSubqueryscan.h
@@ -18,6 +18,7 @@
 
 extern SubqueryScanState *ExecInitSubqueryScan(SubqueryScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecSubqueryScan(SubqueryScanState *node);
+extern bool StartSubqueryScan(SubqueryScanState *node);
 extern void ExecEndSubqueryScan(SubqueryScanState *node);
 extern void ExecReScanSubqueryScan(SubqueryScanState *node);
 
diff --git a/src/include/executor/nodeUnique.h b/src/include/executor/nodeUnique.h
index ec2df59..76727aa 100644
--- a/src/include/executor/nodeUnique.h
+++ b/src/include/executor/nodeUnique.h
@@ -18,6 +18,7 @@
 
 extern UniqueState *ExecInitUnique(Unique *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecUnique(UniqueState *node);
+extern bool StartUnique(UniqueState *node);
 extern void ExecEndUnique(UniqueState *node);
 extern void ExecReScanUnique(UniqueState *node);
 
diff --git a/src/include/executor/nodeWindowAgg.h b/src/include/executor/nodeWindowAgg.h
index 8a7b1fa..e9699b0 100644
--- a/src/include/executor/nodeWindowAgg.h
+++ b/src/include/executor/nodeWindowAgg.h
@@ -18,6 +18,7 @@
 
 extern WindowAggState *ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecWindowAgg(WindowAggState *node);
+extern bool StartWindowAgg(WindowAggState *node);
 extern void ExecEndWindowAgg(WindowAggState *node);
 extern void ExecReScanWindowAgg(WindowAggState *node);
 
-- 
1.8.3.1

>From d2033f22bd45231caf2c9ea2f0af964b7b052c32 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 30 Nov 2015 17:41:03 +0900
Subject: [PATCH 4/5] temporarily introduce a guc enable_asyncexec for test
 usage.

enable_asyncexec = false inhibits asynchronous execution of
nodes. This is added for the purpose of comparing performance with and
without async execution.
---
 src/backend/executor/execMain.c        | 2 ++
 src/backend/executor/nodeAppend.c      | 3 ++-
 src/backend/executor/nodeGather.c      | 1 +
 src/backend/executor/nodeHashjoin.c    | 4 +++-
 src/backend/executor/nodeMergeAppend.c | 3 ++-
 src/backend/executor/nodeMergejoin.c   | 3 ++-
 src/backend/executor/nodeNestloop.c    | 3 ++-
 src/backend/utils/misc/guc.c           | 9 +++++++++
 src/include/optimizer/cost.h           | 1 +
 9 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 9f2af6d..a58cced 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -95,6 +95,8 @@ static char *ExecBuildSlotValueDescription(Oid reloid,
 static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
 				  Plan *planTree);
 
+bool enable_asyncexec = false;
+
 /*
  * Note that GetUpdatedColumns() also exists in commands/trigger.c.  There does
  * not appear to be any good header to put it into, given the structures that
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 2b918b2..4a7d781 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -62,6 +62,7 @@
 
 static bool exec_append_initialize_next(AppendState *appendstate);
 
+extern bool enable_asyncexec;
 
 /* ----------------------------------------------------------------
  *		exec_append_initialize_next
@@ -195,7 +196,7 @@ TupleTableSlot *
 ExecAppend(AppendState *node)
 {
 	/* start child nodes asynchronously if possible */
-	if (ExecNode_is_inited(node))
+	if (enable_asyncexec && ExecNode_is_inited(node))
 		StartAppend(node);
 
 	SetNodeRunState(node, Running);
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 5364acb..b642d51 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -40,6 +40,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
+extern bool enable_asyncexec;
 
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
 static HeapTuple gather_readnext(GatherState *gatherstate);
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ada9290..cb54aba 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -39,6 +39,8 @@
 /* Returns true if doing null-fill on inner relation */
 #define HJ_FILL_INNER(hjstate)	((hjstate)->hj_NullOuterTupleSlot != NULL)
 
+extern bool enable_asyncexec;
+
 static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
 						  HashJoinState *hjstate,
 						  uint32 *hashvalue);
@@ -73,7 +75,7 @@ ExecHashJoin(HashJoinState *node)
 	int			batchno;
 
 	/* Try to start asynchronously */
-	if (ExecNode_is_inited(node))
+	if (enable_asyncexec && ExecNode_is_inited(node))
 		StartHashJoin(node);
 
 	SetNodeRunState(node, Running);
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index ab6c304..fd5abb8 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -52,6 +52,7 @@ typedef int32 SlotNumber;
 
 static int	heap_compare_slots(Datum a, Datum b, void *arg);
 
+extern bool enable_asyncexec;
 
 /* ----------------------------------------------------------------
  *		ExecInitMergeAppend
@@ -171,7 +172,7 @@ ExecMergeAppend(MergeAppendState *node)
 	SlotNumber	i;
 
 	/* start child nodes asynchronously if possible */
-	if (ExecNode_is_inited(node))
+	if (enable_asyncexec && ExecNode_is_inited(node))
 		StartMergeAppend(node);
 	
 	SetNodeRunState(node, Running);
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 32bd8a5..d2b141b 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -151,6 +151,7 @@ typedef enum
 #define MarkInnerTuple(innerTupleSlot, mergestate) \
 	ExecCopySlot((mergestate)->mj_MarkedTupleSlot, (innerTupleSlot))
 
+extern bool enable_asyncexec;
 
 /*
  * MJExamineQuals
@@ -631,7 +632,7 @@ ExecMergeJoin(MergeJoinState *node)
 	bool		doFillInner;
 
 	/* Execute childs asynchronously if possible */
-	if (ExecNode_is_inited(node))
+	if (enable_asyncexec && ExecNode_is_inited(node))
 		StartMergeJoin(node);
 
 	SetNodeRunState(node, Running);
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index cb95a3d..f55cd87 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -25,6 +25,7 @@
 #include "executor/nodeNestloop.h"
 #include "utils/memutils.h"
 
+extern bool enable_asyncexec;
 
 /* ----------------------------------------------------------------
  *		ExecNestLoop(node)
@@ -70,7 +71,7 @@ ExecNestLoop(NestLoopState *node)
 	ListCell   *lc;
 
 	/* Execute childs asynchronously if possible */
-	if (ExecNode_is_inited(node))
+	if (enable_asyncexec && ExecNode_is_inited(node))
 		StartNestLoop(node);
 
 	SetNodeRunState(node, Running);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a185749..16e04d2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -855,6 +855,15 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
+		{"enable_asyncexec", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enable early execution."),
+			NULL
+		},
+		&enable_asyncexec,
+		false,
+		NULL, NULL, NULL
+	},
+	{
 		{"enable_hashjoin", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of hash join plans."),
 			NULL
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index ac21a3a..e9b7595 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern bool enable_hashagg;
 extern bool enable_nestloop;
 extern bool enable_material;
 extern bool enable_mergejoin;
+extern bool enable_asyncexec;
 extern bool enable_hashjoin;
 extern int	constraint_exclusion;
 
-- 
1.8.3.1

>From fb77b5fb53c5aa86fd227c00d7bdd06bdce66dc1 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 30 Nov 2015 17:46:19 +0900
Subject: [PATCH 5/5] Temporary implement of merge join with parallel sort

This patch enables asynchronous execution of parallel execution of
both subtree of merge join if both side are explicitly sorted. This is
quite artifitial behavior but convenient to see difference of
performance.
---
 src/backend/executor/execAmi.c           |  8 ++++++
 src/backend/executor/nodeGather.c        | 35 ++++++++++++++++++++++++-
 src/backend/optimizer/plan/createplan.c  | 45 ++++++++++++++++++++++++++++++++
 src/backend/utils/misc/guc.c             |  9 +++++++
 src/include/executor/nodeGather.h        |  2 ++
 src/include/optimizer/cost.h             |  1 +
 src/test/regress/expected/rangefuncs.out |  4 ++-
 7 files changed, 102 insertions(+), 2 deletions(-)

diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index b969fc0..8289a26 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -315,6 +315,10 @@ ExecMarkPos(PlanState *node)
 			ExecSortMarkPos((SortState *) node);
 			break;
 
+		case T_GatherState:
+			ExecGatherMarkPos((GatherState *) node);
+			break;
+
 		case T_ResultState:
 			ExecResultMarkPos((ResultState *) node);
 			break;
@@ -364,6 +368,10 @@ ExecRestrPos(PlanState *node)
 			ExecSortRestrPos((SortState *) node);
 			break;
 
+		case T_GatherState:
+			ExecGatherRestrPos((GatherState *) node);
+			break;
+
 		case T_ResultState:
 			ExecResultRestrPos((ResultState *) node);
 			break;
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index b642d51..ec45622 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -8,7 +8,7 @@
  *
  * A Gather executor launches parallel workers to run multiple copies of a
  * plan.  It can also run the plan itself, if the workers are not available
- * or have not started up yet.  It then merges all of the results it produces
+ * or have not stated up yet.  It then merges all of the results it produces
  * and the results from the workers into a single output stream.  Therefore,
  * it will normally be used with a plan where running multiple copies of the
  * same plan does not produce duplicate output, such as PartialSeqScan.
@@ -127,6 +127,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
  * Returns true if any of underlying nodes started asynchronously
  * ----------------------------------------------------------------
  */
+/* for test */
+static bool from_execgather = false;
 bool
 StartGather(GatherState *node)
 {
@@ -138,6 +140,10 @@ StartGather(GatherState *node)
 	if (!ExecNode_is_inited(node))
 		return false;
 
+	elog(DEBUG1, "nodeGather executed %s",
+		 from_execgather ? "synchronously" : "asynchronously");
+	from_execgather = false;
+
 	SetNodeRunState(node, Started);
 
 	/*
@@ -220,6 +226,7 @@ ExecGather(GatherState *node)
 	ExprContext *econtext;
 
 	/* Execute childs asynchronously if possible */
+	from_execgather = true;
 	if (ExecNode_is_inited(node))
 		StartGather(node);
 
@@ -468,6 +475,32 @@ ExecShutdownGather(GatherState *node)
  */
 
 /* ----------------------------------------------------------------
+ *		ExecGatherMarkPos
+ *
+ *		Calls MarkPos of the top node on the worker
+ * ----------------------------------------------------------------
+ */
+void
+ExecGatherMarkPos(GatherState *node)
+{
+	/* There's no means to command worker? */
+	elog(DEBUG1, "MarkPos on Gather is not implemented");
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortRestrPos
+ *
+ *		Calls tuplesort to restore the last saved sort file position.
+ * ----------------------------------------------------------------
+ */
+void
+ExecGatherRestrPos(GatherState *node)
+{
+	/* There's no means to command worker? */
+	elog(ERROR, "RestrPos on Gather is not implemented");
+}
+
+/* ----------------------------------------------------------------
  *		ExecReScanGather
  *
  *		Re-initialize the workers and rescans a relation via them.
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 411b36c..e3d3456 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -844,10 +844,24 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
 
 		/* Now, insert a Sort node if subplan isn't sufficiently ordered */
 		if (!pathkeys_contained_in(pathkeys, subpath->pathkeys))
+		{
 			subplan = (Plan *) make_sort(root, subplan, numsortkeys,
 										 sortColIdx, sortOperators,
 										 collations, nullsFirst,
 										 best_path->limit_tuples);
+			if (enable_parasortmerge)
+			{
+				Gather *gather;
+
+				gather = make_gather(subplan->targetlist,
+									 NIL,
+									 1, /* num_workers */
+									 true, /* single_copy */
+									 subplan);
+				subplan = (Plan *)gather;
+				root->glob->parallelModeNeeded = true;
+			}
+		}
 
 		subplans = lappend(subplans, subplan);
 	}
@@ -2360,6 +2374,9 @@ create_nestloop_plan(PlannerInfo *root,
 	return join_plan;
 }
 
+bool enable_parasortmerge = false;
+extern bool enable_asyncexec;
+
 static MergeJoin *
 create_mergejoin_plan(PlannerInfo *root,
 					  MergePath *best_path,
@@ -2637,6 +2654,34 @@ create_mergejoin_plan(PlannerInfo *root,
 	/*
 	 * Now we can build the mergejoin node.
 	 */
+
+	/* Try sort in bgworker, even if both side is not sort */
+	if (enable_parasortmerge)
+	{
+		Gather *gather;
+
+		if (IsA(outer_plan, Sort))
+		{
+			gather = make_gather(outer_plan->targetlist,
+								 NIL,
+								 1, /* num_workers */
+								 true, /* single_copy */
+								 outer_plan);
+			outer_plan = (Plan *)gather;
+			root->glob->parallelModeNeeded = true;
+		}
+
+		if (IsA(inner_plan, Sort))
+		{
+			gather = make_gather(inner_plan->targetlist,
+								 NIL,
+								 1, /* num_workers */
+								 true, /* single_copy */
+								 inner_plan);
+			inner_plan = (Plan *)gather;
+			root->glob->parallelModeNeeded = true;
+		}
+	}
 	join_plan = make_mergejoin(tlist,
 							   joinclauses,
 							   otherclauses,
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 16e04d2..61afd1e 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -855,6 +855,15 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
+		{"enable_parasortmerge", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables parallel sort if both side of mergejoin need to be sorted."),
+			NULL
+		},
+		&enable_parasortmerge,
+		false,
+		NULL, NULL, NULL
+	},
+	{
 		{"enable_asyncexec", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enable early execution."),
 			NULL
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index e7cbe21..d724a2e 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -21,6 +21,8 @@ extern bool StartGather(GatherState *node);
 extern TupleTableSlot *ExecGather(GatherState *node);
 extern void ExecEndGather(GatherState *node);
 extern void ExecShutdownGather(GatherState *node);
+extern void ExecGatherMarkPos(GatherState *node);
+extern void ExecGatherRestrPos(GatherState *node);
 extern void ExecReScanGather(GatherState *node);
 
 #endif   /* NODEGATHER_H */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index e9b7595..4f9de21 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern bool enable_hashagg;
 extern bool enable_nestloop;
 extern bool enable_material;
 extern bool enable_mergejoin;
+extern bool enable_parasortmerge;
 extern bool enable_asyncexec;
 extern bool enable_hashjoin;
 extern int	constraint_exclusion;
diff --git a/src/test/regress/expected/rangefuncs.out b/src/test/regress/expected/rangefuncs.out
index 00ef421..73b5aaa 100644
--- a/src/test/regress/expected/rangefuncs.out
+++ b/src/test/regress/expected/rangefuncs.out
@@ -1,6 +1,7 @@
 SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
          name         | setting 
 ----------------------+---------
+ enable_asyncexec     | off
  enable_bitmapscan    | on
  enable_hashagg       | on
  enable_hashjoin      | on
@@ -9,10 +10,11 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
  enable_material      | on
  enable_mergejoin     | on
  enable_nestloop      | on
+ enable_parasortmerge | off
  enable_seqscan       | on
  enable_sort          | on
  enable_tidscan       | on
-(11 rows)
+(13 rows)
 
 CREATE TABLE foo2(fooid int, f2 int);
 INSERT INTO foo2 VALUES(1, 11);
-- 
1.8.3.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to