Hello.

Fully-asynchronous executor needs that every node is stateful and
suspendable at the time of requesting for the next tuples to
underneath nodes. I tried pure push-base executor but failed.

After the miserable patch upthread, I finally managed to make
executor nodes suspendable using computational jump and got rid
of recursive calls of executor. But it runs about x10 slower for
simple SeqScan case. (pgbench ran with 9% degradation.) It
doesn't seem recoverable by handy improvements. So I gave up
that.

Then I returned to single-level asynchrony, in other words, the
simple case with async-aware nodes just above async-capable
nodes. The motive of using the framework in the previous patch
was that we had degradation on the sync (or normal) code paths by
polluting ExecProcNode with async stuff and as Tom's suggestion
the node->ExecProcNode trick can isolate the async code path.

The attached PoC patch theoretically has no impact on the normal
code paths and just brings gain in async cases. (Additional
members in PlanState made degradation seemingly comes from
alignment, though.)

But I haven't had enough stable result from performance
test. Different builds from the same source code gives apparently
different results...

Anyway I'll show the best one in the several times run here.

                           original(ms) patched(ms)    gain(%)
A: simple table scan     :  9714.70      9656.73         0.6
B: local partitioning    :  4119.44      4131.10        -0.3
C: single remote table   :  9484.86      9141.89         3.7
D: sharding (single con) :  7114.34      6751.21         5.1
E: sharding (multi con)  :  7166.56      1827.93        74.5

A and B are degradation checks, which are expected to show no
degradation.  C is the gain only by postgres_fdw's command
presending on a remote table.  D is the gain of sharding on a
connection. The number of partitions/shards is 4.  E is the gain
using dedicate connection per shard.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From fc424c16e124934581a184fcadaed1e05f7672c8 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 22 May 2017 12:42:58 +0900
Subject: [PATCH 1/3] Allow wait event set to be registered to resource owner

WaitEventSet needs to be released using resource owner for a certain
case. This change adds WaitEventSet reowner and allow the creator of a
WaitEventSet to specify a resource owner.
---
 src/backend/libpq/pqcomm.c                    |  2 +-
 src/backend/storage/ipc/latch.c               | 18 ++++++-
 src/backend/storage/lmgr/condition_variable.c |  2 +-
 src/backend/utils/resowner/resowner.c         | 68 +++++++++++++++++++++++++++
 src/include/storage/latch.h                   |  4 +-
 src/include/utils/resowner_private.h          |  8 ++++
 6 files changed, 97 insertions(+), 5 deletions(-)

diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 754154b..d459f32 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -220,7 +220,7 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
-	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3);
 	AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
 					  NULL, NULL);
 	AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 4eb6e83..e6fc3dd 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -51,6 +51,7 @@
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/resowner_private.h"
 
 /*
  * Select the fd readiness primitive to use. Normally the "most modern"
@@ -77,6 +78,8 @@ struct WaitEventSet
 	int			nevents;		/* number of registered events */
 	int			nevents_space;	/* maximum number of events in this set */
 
+	ResourceOwner	resowner;	/* Resource owner */
+
 	/*
 	 * Array, of nevents_space length, storing the definition of events this
 	 * set is waiting for.
@@ -359,7 +362,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 	int			ret = 0;
 	int			rc;
 	WaitEvent	event;
-	WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+	WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3);
 
 	if (wakeEvents & WL_TIMEOUT)
 		Assert(timeout >= 0);
@@ -517,12 +520,15 @@ ResetLatch(volatile Latch *latch)
  * WaitEventSetWait().
  */
 WaitEventSet *
-CreateWaitEventSet(MemoryContext context, int nevents)
+CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents)
 {
 	WaitEventSet *set;
 	char	   *data;
 	Size		sz = 0;
 
+	if (res)
+		ResourceOwnerEnlargeWESs(res);
+
 	/*
 	 * Use MAXALIGN size/alignment to guarantee that later uses of memory are
 	 * aligned correctly. E.g. epoll_event might need 8 byte alignment on some
@@ -591,6 +597,11 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 	StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
 #endif
 
+	/* Register this wait event set if requested */
+	set->resowner = res;
+	if (res)
+		ResourceOwnerRememberWES(set->resowner, set);
+
 	return set;
 }
 
@@ -632,6 +643,9 @@ FreeWaitEventSet(WaitEventSet *set)
 	}
 #endif
 
+	if (set->resowner != NULL)
+		ResourceOwnerForgetWES(set->resowner, set);
+
 	pfree(set);
 }
 
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index b4b7d28..182f759 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -66,7 +66,7 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 	/* Create a reusable WaitEventSet. */
 	if (cv_wait_event_set == NULL)
 	{
-		cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1);
+		cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, NULL, 1);
 		AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
 						  MyLatch, NULL);
 	}
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index bd19fad..d36481e 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -124,6 +124,7 @@ typedef struct ResourceOwnerData
 	ResourceArray snapshotarr;	/* snapshot references */
 	ResourceArray filearr;		/* open temporary files */
 	ResourceArray dsmarr;		/* dynamic shmem segments */
+	ResourceArray wesarr;		/* wait event sets */
 
 	/* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
 	int			nlocks;			/* number of owned locks */
@@ -169,6 +170,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
 static void PrintSnapshotLeakWarning(Snapshot snapshot);
 static void PrintFileLeakWarning(File file);
 static void PrintDSMLeakWarning(dsm_segment *seg);
+static void PrintWESLeakWarning(WaitEventSet *events);
 
 
 /*****************************************************************************
@@ -437,6 +439,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
 	ResourceArrayInit(&(owner->snapshotarr), PointerGetDatum(NULL));
 	ResourceArrayInit(&(owner->filearr), FileGetDatum(-1));
 	ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL));
+	ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL));
 
 	return owner;
 }
@@ -552,6 +555,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 				PrintDSMLeakWarning(res);
 			dsm_detach(res);
 		}
+
+		/* Ditto for wait event sets */
+		while (ResourceArrayGetAny(&(owner->wesarr), &foundres))
+		{
+			WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres);
+
+			if (isCommit)
+				PrintWESLeakWarning(event);
+			FreeWaitEventSet(event);
+		}
 	}
 	else if (phase == RESOURCE_RELEASE_LOCKS)
 	{
@@ -699,6 +712,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 	Assert(owner->snapshotarr.nitems == 0);
 	Assert(owner->filearr.nitems == 0);
 	Assert(owner->dsmarr.nitems == 0);
+	Assert(owner->wesarr.nitems == 0);
 	Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
 
 	/*
@@ -725,6 +739,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 	ResourceArrayFree(&(owner->snapshotarr));
 	ResourceArrayFree(&(owner->filearr));
 	ResourceArrayFree(&(owner->dsmarr));
+	ResourceArrayFree(&(owner->wesarr));
 
 	pfree(owner);
 }
@@ -1267,3 +1282,56 @@ PrintDSMLeakWarning(dsm_segment *seg)
 	elog(WARNING, "dynamic shared memory leak: segment %u still referenced",
 		 dsm_segment_handle(seg));
 }
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * wait event set reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeWESs(ResourceOwner owner)
+{
+	ResourceArrayEnlarge(&(owner->wesarr));
+}
+
+/*
+ * Remember that a wait event set is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeWESs()
+ */
+void
+ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events)
+{
+	ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events));
+}
+
+/*
+ * Forget that a wait event set is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
+{
+	/*
+	 * XXXX: There's no property to show as an identier of a wait event set,
+	 * use its pointer instead.
+	 */
+	if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
+		elog(ERROR, "wait event set %p is not owned by resource owner %s",
+			 events, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintWESLeakWarning(WaitEventSet *events)
+{
+	/*
+	 * XXXX: There's no property to show as an identier of a wait event set,
+	 * use its pointer instead.
+	 */
+	elog(WARNING, "wait event set leak: %p still referenced",
+		 events);
+}
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a43193c..997ee8d 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -101,6 +101,7 @@
 #define LATCH_H
 
 #include <signal.h>
+#include "utils/resowner.h"
 
 /*
  * Latch structure should be treated as opaque and only accessed through
@@ -162,7 +163,8 @@ extern void DisownLatch(volatile Latch *latch);
 extern void SetLatch(volatile Latch *latch);
 extern void ResetLatch(volatile Latch *latch);
 
-extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents);
+extern WaitEventSet *CreateWaitEventSet(MemoryContext context,
+										ResourceOwner res, int nevents);
 extern void FreeWaitEventSet(WaitEventSet *set);
 extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
 				  Latch *latch, void *user_data);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index 2420b65..70b0bb9 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -18,6 +18,7 @@
 
 #include "storage/dsm.h"
 #include "storage/fd.h"
+#include "storage/latch.h"
 #include "storage/lock.h"
 #include "utils/catcache.h"
 #include "utils/plancache.h"
@@ -88,4 +89,11 @@ extern void ResourceOwnerRememberDSM(ResourceOwner owner,
 extern void ResourceOwnerForgetDSM(ResourceOwner owner,
 					   dsm_segment *);
 
+/* support for wait event set management */
+extern void ResourceOwnerEnlargeWESs(ResourceOwner owner);
+extern void ResourceOwnerRememberWES(ResourceOwner owner,
+						 WaitEventSet *);
+extern void ResourceOwnerForgetWES(ResourceOwner owner,
+					   WaitEventSet *);
+
 #endif							/* RESOWNER_PRIVATE_H */
-- 
2.9.2

>From 1b213d238c398dc77cb31cf2a92284c70d292e9e Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:23:51 +0900
Subject: [PATCH 2/3] core side modification

---
 src/backend/executor/Makefile           |   2 +-
 src/backend/executor/execAsync.c        | 110 ++++++++++++++++++
 src/backend/executor/nodeAppend.c       | 194 ++++++++++++++++++++++++++++++--
 src/backend/executor/nodeForeignscan.c  |  22 +++-
 src/backend/optimizer/plan/createplan.c |  56 ++++++++-
 src/backend/postmaster/pgstat.c         |   3 +
 src/include/executor/execAsync.h        |  23 ++++
 src/include/executor/executor.h         |   1 +
 src/include/executor/nodeForeignscan.h  |   3 +
 src/include/foreign/fdwapi.h            |  11 ++
 src/include/nodes/execnodes.h           |  18 ++-
 src/include/nodes/plannodes.h           |   2 +
 src/include/pgstat.h                    |   3 +-
 13 files changed, 428 insertions(+), 20 deletions(-)
 create mode 100644 src/backend/executor/execAsync.c
 create mode 100644 src/include/executor/execAsync.h

diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 083b20f..21f5ad0 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/executor
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
+OBJS = execAmi.o execAsync.o execCurrent.o execExpr.o execExprInterp.o \
        execGrouping.o execIndexing.o execJunk.o \
        execMain.o execParallel.o execProcnode.o \
        execReplication.o execScan.o execSRF.o execTuples.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000..f7daed7
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,110 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *	  Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+void ExecAsyncSetState(PlanState *pstate, AsyncState status)
+{
+	pstate->asyncstate = status;
+}
+
+bool
+ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+					   void *data, bool reinit)
+{
+	switch (nodeTag(node))
+	{
+	case T_ForeignScanState:
+		return ExecForeignAsyncConfigureWait((ForeignScanState *)node,
+											 wes, data, reinit);
+		break;
+	default:
+			elog(ERROR, "unrecognized node type: %d",
+				(int) nodeTag(node));
+	}
+}
+
+#define EVENT_BUFFER_SIZE 16
+
+Bitmapset *
+ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout)
+{
+	static int *refind = NULL;
+	static int refindsize = 0;
+	WaitEventSet *wes;
+	WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
+	int noccurred = 0;
+	Bitmapset *fired_events = NULL;
+	int i;
+	int n;
+
+	n = bms_num_members(waitnodes);
+	wes = CreateWaitEventSet(TopTransactionContext,
+							 TopTransactionResourceOwner, n);
+	if (refindsize < n)
+	{
+		if (refindsize == 0)
+			refindsize = EVENT_BUFFER_SIZE; /* XXX */
+		while (refindsize < n)
+			refindsize *= 2;
+		if (refind)
+			refind = (int *) repalloc(refind, refindsize * sizeof(int));
+		else
+			refind = (int *) palloc(refindsize * sizeof(int));
+	}
+
+	n = 0;
+	for (i = bms_next_member(waitnodes, -1) ; i >= 0 ;
+		 i = bms_next_member(waitnodes, i))
+	{
+		refind[i] = i;
+		if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true))
+			n++;
+	}
+
+	if (n == 0)
+	{
+		FreeWaitEventSet(wes);
+		return NULL;
+	}
+
+	noccurred = WaitEventSetWait(wes, timeout, occurred_event,
+								 EVENT_BUFFER_SIZE,
+								 WAIT_EVENT_ASYNC_WAIT);
+	FreeWaitEventSet(wes);
+	if (noccurred == 0)
+		return NULL;
+
+	for (i = 0 ; i < noccurred ; i++)
+	{
+		WaitEvent *w = &occurred_event[i];
+
+		if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+		{
+			int n = *(int*)w->user_data;
+
+			fired_events = bms_add_member(fired_events, n);
+		}
+	}
+
+	return fired_events;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index bed9bb8..5355bb2 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -59,9 +59,11 @@
 
 #include "executor/execdebug.h"
 #include "executor/nodeAppend.h"
+#include "executor/execAsync.h"
 #include "miscadmin.h"
 
 static TupleTableSlot *ExecAppend(PlanState *pstate);
+static TupleTableSlot *ExecAppendAsync(PlanState *pstate);
 static bool exec_append_initialize_next(AppendState *appendstate);
 
 
@@ -81,16 +83,16 @@ exec_append_initialize_next(AppendState *appendstate)
 	/*
 	 * get information from the append node
 	 */
-	whichplan = appendstate->as_whichplan;
+	whichplan = appendstate->as_whichsyncplan;
 
-	if (whichplan < 0)
+	if (whichplan < appendstate->as_nasyncplans)
 	{
 		/*
 		 * if scanning in reverse, we start at the last scan in the list and
 		 * then proceed back to the first.. in any case we inform ExecAppend
 		 * that we are at the end of the line by returning FALSE
 		 */
-		appendstate->as_whichplan = 0;
+		appendstate->as_whichsyncplan = appendstate->as_nasyncplans;
 		return FALSE;
 	}
 	else if (whichplan >= appendstate->as_nplans)
@@ -98,7 +100,7 @@ exec_append_initialize_next(AppendState *appendstate)
 		/*
 		 * as above, end the scan if we go beyond the last scan in our list..
 		 */
-		appendstate->as_whichplan = appendstate->as_nplans - 1;
+		appendstate->as_whichsyncplan = appendstate->as_nplans - 1;
 		return FALSE;
 	}
 	else
@@ -128,7 +130,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	ListCell   *lc;
 
 	/* check for unsupported flags */
-	Assert(!(eflags & EXEC_FLAG_MARK));
+	Assert(!(eflags & EXEC_FLAG_MARK | EXEC_FLAG_ASYNC));
 
 	/*
 	 * Lock the non-leaf tables in the partition tree controlled by this node.
@@ -151,6 +153,19 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	appendstate->ps.ExecProcNode = ExecAppend;
 	appendstate->appendplans = appendplanstates;
 	appendstate->as_nplans = nplans;
+	appendstate->as_nasyncplans = node->nasyncplans;
+	appendstate->as_syncdone = (node->nasyncplans == nplans);
+	appendstate->as_asyncresult = (TupleTableSlot **)
+		palloc0(node->nasyncplans * sizeof(TupleTableSlot *));
+
+	/* Choose async version of Exec function */
+	if (appendstate->as_nasyncplans > 0)
+		appendstate->ps.ExecProcNode = ExecAppendAsync;
+
+	/* initially, all async requests need a request */
+	for (i = 0; i < appendstate->as_nasyncplans; ++i)
+		appendstate->as_needrequest =
+			bms_add_member(appendstate->as_needrequest, i);
 
 	/*
 	 * Miscellaneous initialization
@@ -173,11 +188,19 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	foreach(lc, node->appendplans)
 	{
 		Plan	   *initNode = (Plan *) lfirst(lc);
+		int			sub_eflags = eflags;
+
+		if (i < appendstate->as_nasyncplans)
+			sub_eflags |= EXEC_FLAG_ASYNC;
 
-		appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
+		appendplanstates[i] = ExecInitNode(initNode, estate, sub_eflags);
 		i++;
 	}
 
+	/* if there's any async-capable subnode, use async-aware routine */
+	if (appendstate->as_nasyncplans)
+		appendstate->ps.ExecProcNode = ExecAppendAsync;
+
 	/*
 	 * initialize output tuple type
 	 */
@@ -187,7 +210,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	/*
 	 * initialize to scan first subplan
 	 */
-	appendstate->as_whichplan = 0;
+	/*
+	 * initialize to scan first synchronous subplan
+	 */
+	appendstate->as_whichsyncplan = appendstate->as_nasyncplans;
 	exec_append_initialize_next(appendstate);
 
 	return appendstate;
@@ -204,6 +230,8 @@ ExecAppend(PlanState *pstate)
 {
 	AppendState *node = castNode(AppendState, pstate);
 
+	Assert(node->as_nasyncplans == 0);
+
 	for (;;)
 	{
 		PlanState  *subnode;
@@ -214,7 +242,7 @@ ExecAppend(PlanState *pstate)
 		/*
 		 * figure out which subplan we are currently processing
 		 */
-		subnode = node->appendplans[node->as_whichplan];
+		subnode = node->appendplans[node->as_whichsyncplan];
 
 		/*
 		 * get a tuple from the subplan
@@ -237,9 +265,9 @@ ExecAppend(PlanState *pstate)
 		 * ExecInitAppend.
 		 */
 		if (ScanDirectionIsForward(node->ps.state->es_direction))
-			node->as_whichplan++;
+			node->as_whichsyncplan++;
 		else
-			node->as_whichplan--;
+			node->as_whichsyncplan--;
 		if (!exec_append_initialize_next(node))
 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
@@ -247,6 +275,141 @@ ExecAppend(PlanState *pstate)
 	}
 }
 
+static TupleTableSlot *
+ExecAppendAsync(PlanState *pstate)
+{
+	AppendState *node = castNode(AppendState, pstate);
+	Bitmapset *needrequest;
+	int	i;
+
+	Assert(node->as_nasyncplans > 0);
+
+	if (node->as_nasyncresult > 0)
+	{
+		--node->as_nasyncresult;
+		return node->as_asyncresult[node->as_nasyncresult];
+	}
+
+	needrequest = node->as_needrequest;
+	node->as_needrequest = NULL;
+	while ((i = bms_first_member(needrequest)) >= 0)
+	{
+		TupleTableSlot *slot;
+		PlanState *subnode = node->appendplans[i];
+
+		slot = ExecProcNode(subnode);
+		if (subnode->asyncstate == AS_AVAILABLE)
+		{
+			if (!TupIsNull(slot))
+			{
+				node->as_asyncresult[node->as_nasyncresult++] = slot;
+				node->as_needrequest = bms_add_member(node->as_needrequest, i);
+			}
+		}
+		else
+			node->as_pending_async = bms_add_member(node->as_pending_async, i);
+	}
+	bms_free(needrequest);
+
+	for (;;)
+	{
+		TupleTableSlot *result;
+
+		/* return now if a result is available */
+		if (node->as_nasyncresult > 0)
+		{
+			--node->as_nasyncresult;
+			return node->as_asyncresult[node->as_nasyncresult];
+		}
+
+		while (!bms_is_empty(node->as_pending_async))
+		{
+			long timeout = node->as_syncdone ? -1 : 0;
+			Bitmapset *fired;
+			int i;
+
+			fired = ExecAsyncEventWait(node->appendplans, node->as_pending_async,
+									   timeout);
+			while ((i = bms_first_member(fired)) >= 0)
+			{
+				TupleTableSlot *slot;
+				PlanState *subnode = node->appendplans[i];
+				slot = ExecProcNode(subnode);
+				if (subnode->asyncstate == AS_AVAILABLE)
+				{
+					if (!TupIsNull(slot))
+					{
+						node->as_asyncresult[node->as_nasyncresult++] = slot;
+						node->as_needrequest =
+							bms_add_member(node->as_needrequest, i);
+					}
+					node->as_pending_async =
+						bms_del_member(node->as_pending_async, i);
+				}
+			}
+			bms_free(fired);
+
+			/* return now if a result is available */
+			if (node->as_nasyncresult > 0)
+			{
+				--node->as_nasyncresult;
+				return node->as_asyncresult[node->as_nasyncresult];
+			}
+
+			if (!node->as_syncdone)
+				break;
+		}
+
+		/*
+		 * If there is no asynchronous activity still pending and the
+		 * synchronous activity is also complete, we're totally done scanning
+		 * this node.  Otherwise, we're done with the asynchronous stuff but
+		 * must continue scanning the synchronous children.
+		 */
+		if (node->as_syncdone)
+		{
+			Assert(bms_is_empty(node->as_pending_async));
+			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		}
+
+		/*
+		 * get a tuple from the subplan
+		 */
+		result = ExecProcNode(node->appendplans[node->as_whichsyncplan]);
+
+		if (!TupIsNull(result))
+		{
+			/*
+			 * If the subplan gave us something then return it as-is. We do
+			 * NOT make use of the result slot that was set up in
+			 * ExecInitAppend; there's no need for it.
+			 */
+			return result;
+		}
+
+		/*
+		 * Go on to the "next" subplan in the appropriate direction. If no
+		 * more subplans, return the empty slot set up for us by
+		 * ExecInitAppend, unless there are async plans we have yet to finish.
+		 */
+		if (ScanDirectionIsForward(node->ps.state->es_direction))
+			node->as_whichsyncplan++;
+		else
+			node->as_whichsyncplan--;
+		if (!exec_append_initialize_next(node))
+		{
+			node->as_syncdone = true;
+			if (bms_is_empty(node->as_pending_async))
+			{
+				Assert(bms_is_empty(node->as_needrequest));
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+			}
+		}
+
+		/* Else loop back and try to get a tuple from the new subplan */
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecEndAppend
  *
@@ -280,6 +443,15 @@ ExecReScanAppend(AppendState *node)
 {
 	int			i;
 
+	/* Reset async state. */
+	for (i = 0; i < node->as_nasyncplans; ++i)
+	{
+		ExecShutdownNode(node->appendplans[i]);
+		node->as_needrequest = bms_add_member(node->as_needrequest, i);
+	}
+	node->as_nasyncresult = 0;
+	node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
 	for (i = 0; i < node->as_nplans; i++)
 	{
 		PlanState  *subnode = node->appendplans[i];
@@ -298,6 +470,6 @@ ExecReScanAppend(AppendState *node)
 		if (subnode->chgParam == NULL)
 			ExecReScan(subnode);
 	}
-	node->as_whichplan = 0;
+	node->as_whichsyncplan = node->as_nasyncplans;
 	exec_append_initialize_next(node);
 }
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 20892d6..e851988 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -123,7 +123,6 @@ ExecForeignScan(PlanState *pstate)
 					(ExecScanRecheckMtd) ForeignRecheck);
 }
 
-
 /* ----------------------------------------------------------------
  *		ExecInitForeignScan
  * ----------------------------------------------------------------
@@ -147,6 +146,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
 	scanstate->ss.ps.ExecProcNode = ExecForeignScan;
+	scanstate->ss.ps.asyncstate = AS_AVAILABLE;
+
+	if ((eflags & EXEC_FLAG_ASYNC) != 0)
+		scanstate->fs_async = true;
 
 	/*
 	 * Miscellaneous initialization
@@ -388,3 +391,20 @@ ExecShutdownForeignScan(ForeignScanState *node)
 	if (fdwroutine->ShutdownForeignScan)
 		fdwroutine->ShutdownForeignScan(node);
 }
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncForeignScanConfigureWait
+ *
+ *		In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+bool
+ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+							  void *caller_data, bool reinit)
+{
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+	return fdwroutine->ForeignAsyncConfigureWait(node, wes,
+												 caller_data, reinit);
+}
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 792ea84..53eb56d 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
 						 Index scanrelid, char *enrname);
 static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
 				   Index scanrelid, int wtParam);
-static Append *make_append(List *appendplans, List *tlist, List *partitioned_rels);
+static Append *make_append(List *appendplans, int nasyncplans,	int referent,
+						   List *tlist, List *partitioned_rels);
 static RecursiveUnion *make_recursive_union(List *tlist,
 					 Plan *lefttree,
 					 Plan *righttree,
@@ -283,6 +284,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
 				 List *rowMarks, OnConflictExpr *onconflict, int epqParam);
 static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
 						 GatherMergePath *best_path);
+static bool is_async_capable_path(Path *path);
 
 
 /*
@@ -1004,8 +1006,12 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 {
 	Append	   *plan;
 	List	   *tlist = build_path_tlist(root, &best_path->path);
-	List	   *subplans = NIL;
+	List	   *asyncplans = NIL;
+	List	   *syncplans = NIL;
 	ListCell   *subpaths;
+	int			nasyncplans = 0;
+	bool		first = true;
+	bool		referent_is_sync = true;
 
 	/*
 	 * The subpaths list could be empty, if every child was proven empty by
@@ -1040,7 +1046,18 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 		/* Must insist that all children return the same tlist */
 		subplan = create_plan_recurse(root, subpath, CP_EXACT_TLIST);
 
-		subplans = lappend(subplans, subplan);
+		/* Classify as async-capable or not */
+		if (is_async_capable_path(subpath))
+		{
+			asyncplans = lappend(asyncplans, subplan);
+			++nasyncplans;
+			if (first)
+				referent_is_sync = false;
+		}
+		else
+			syncplans = lappend(syncplans, subplan);
+
+		first = false;
 	}
 
 	/*
@@ -1050,7 +1067,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 	 * parent-rel Vars it'll be asked to emit.
 	 */
 
-	plan = make_append(subplans, tlist, best_path->partitioned_rels);
+	plan = make_append(list_concat(asyncplans, syncplans), nasyncplans,
+					   referent_is_sync ? nasyncplans : 0, tlist,
+					   best_path->partitioned_rels);
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -5281,7 +5300,8 @@ make_foreignscan(List *qptlist,
 }
 
 static Append *
-make_append(List *appendplans, List *tlist, List *partitioned_rels)
+make_append(List *appendplans, int nasyncplans,	int referent,
+			List *tlist, List *partitioned_rels)
 {
 	Append	   *node = makeNode(Append);
 	Plan	   *plan = &node->plan;
@@ -5292,6 +5312,8 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels)
 	plan->righttree = NULL;
 	node->partitioned_rels = partitioned_rels;
 	node->appendplans = appendplans;
+	node->nasyncplans = nasyncplans;
+	node->referent = referent;
 
 	return node;
 }
@@ -6628,3 +6650,27 @@ is_projection_capable_plan(Plan *plan)
 	}
 	return true;
 }
+
+/*
+ * is_projection_capable_path
+ *		Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+	switch (nodeTag(path))
+	{
+		case T_ForeignPath:
+			{
+				FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+				Assert(fdwroutine != NULL);
+				if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+					fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+					return true;
+			}
+		default:
+			break;
+	}
+	return false;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 3a0b49c..4c6571e 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3628,6 +3628,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_SYNC_REP:
 			event_name = "SyncRep";
 			break;
+		case WAIT_EVENT_ASYNC_WAIT:
+			event_name = "AsyncExecWait";
+			break;
 			/* no default case, so that compiler will warn */
 	}
 
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000..5fd67d9
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ *		Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+#include "storage/latch.h"
+
+extern void ExecAsyncSetState(PlanState *pstate, AsyncState status);
+extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+								   void *data, bool reinit);
+extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes,
+									 long timeout);
+#endif   /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 37fd6b2..2ab9d72 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -63,6 +63,7 @@
 #define EXEC_FLAG_WITH_OIDS		0x0020	/* force OIDs in returned tuples */
 #define EXEC_FLAG_WITHOUT_OIDS	0x0040	/* force no OIDs in returned tuples */
 #define EXEC_FLAG_WITH_NO_DATA	0x0080	/* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC			0x0100	/* request async execution */
 
 
 /* Hook for plugins to get control in ExecutorStart() */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 0354c2c..fed46d7 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
 								shm_toc *toc);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node,
+										  WaitEventSet *wes,
+										  void *caller_data, bool reinit);
 
 #endif							/* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 04e43cc..566236b 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -161,6 +161,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
 															List *fdw_private,
 															RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node,
+													WaitEventSet *wes,
+													void *caller_data,
+													bool reinit);
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -182,6 +187,7 @@ typedef struct FdwRoutine
 	GetForeignPlan_function GetForeignPlan;
 	BeginForeignScan_function BeginForeignScan;
 	IterateForeignScan_function IterateForeignScan;
+	IterateForeignScan_function IterateForeignScanAsync;
 	ReScanForeignScan_function ReScanForeignScan;
 	EndForeignScan_function EndForeignScan;
 
@@ -232,6 +238,11 @@ typedef struct FdwRoutine
 	InitializeDSMForeignScan_function InitializeDSMForeignScan;
 	ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
 	InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+	/* Support functions for asynchronous execution */
+	IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+	ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+
 	ShutdownForeignScan_function ShutdownForeignScan;
 
 	/* Support functions for path reparameterization. */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index c461134..7f663eb 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -840,6 +840,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate);
  * abstract superclass for all PlanState-type nodes.
  * ----------------
  */
+typedef enum AsyncState
+{
+	AS_AVAILABLE,
+	AS_WAITING
+} AsyncState;
+
 typedef struct PlanState
 {
 	NodeTag		type;
@@ -880,6 +886,9 @@ typedef struct PlanState
 	TupleTableSlot *ps_ResultTupleSlot; /* slot for my result tuples */
 	ExprContext *ps_ExprContext;	/* node's expression-evaluation context */
 	ProjectionInfo *ps_ProjInfo;	/* info for doing tuple projection */
+
+	AsyncState	asyncstate;
+	int32		padding;			/* to keep alignment of derived types */
 } PlanState;
 
 /* ----------------
@@ -1003,7 +1012,13 @@ typedef struct AppendState
 	PlanState	ps;				/* its first field is NodeTag */
 	PlanState **appendplans;	/* array of PlanStates for my inputs */
 	int			as_nplans;
-	int			as_whichplan;
+	int			as_nasyncplans;	/* # of async-capable children */
+	int			as_whichsyncplan; /* which sync plan is being executed  */
+	bool		as_syncdone;	/* all synchronous plans done? */
+	Bitmapset  *as_needrequest;	/* async plans needing a new request */
+	Bitmapset  *as_pending_async;	/* pending async plans */
+	TupleTableSlot **as_asyncresult;	/* unreturned results of async plans */
+	int			as_nasyncresult;	/* # of valid entries in as_asyncresult */
 } AppendState;
 
 /* ----------------
@@ -1546,6 +1561,7 @@ typedef struct ForeignScanState
 	Size		pscan_len;		/* size of parallel coordination information */
 	/* use struct pointer to avoid including fdwapi.h here */
 	struct FdwRoutine *fdwroutine;
+	bool		fs_async;
 	void	   *fdw_state;		/* foreign-data wrapper can keep state here */
 } ForeignScanState;
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index a382331..e0eccc8 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -248,6 +248,8 @@ typedef struct Append
 	/* RT indexes of non-leaf tables in a partition tree */
 	List	   *partitioned_rels;
 	List	   *appendplans;
+	int			nasyncplans;	/* # of async plans, always at start of list */
+	int			referent; 		/* index of inheritance tree referent */
 } Append;
 
 /* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 089b7c3..fe9d39c 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -816,7 +816,8 @@ typedef enum
 	WAIT_EVENT_REPLICATION_ORIGIN_DROP,
 	WAIT_EVENT_REPLICATION_SLOT_DROP,
 	WAIT_EVENT_SAFE_SNAPSHOT,
-	WAIT_EVENT_SYNC_REP
+	WAIT_EVENT_SYNC_REP,
+	WAIT_EVENT_ASYNC_WAIT
 } WaitEventIPC;
 
 /* ----------
-- 
2.9.2

>From 9f6a16ef7f7d1a38353216191641deb0d3ea58e7 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:24:07 +0900
Subject: [PATCH 3/3] async postgres_fdw

---
 contrib/postgres_fdw/connection.c              |  26 ++
 contrib/postgres_fdw/expected/postgres_fdw.out | 128 ++++---
 contrib/postgres_fdw/postgres_fdw.c            | 484 +++++++++++++++++++++----
 contrib/postgres_fdw/postgres_fdw.h            |   2 +
 contrib/postgres_fdw/sql/postgres_fdw.sql      |  20 +-
 5 files changed, 522 insertions(+), 138 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index be4ec07..00301d0 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
 	bool		invalidated;	/* true if reconnect is pending */
 	uint32		server_hashvalue;	/* hash value of foreign server OID */
 	uint32		mapping_hashvalue;	/* hash value of user mapping OID */
+	void		*storage;		/* connection specific storage */
 } ConnCacheEntry;
 
 /*
@@ -202,6 +203,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 			 entry->conn, server->servername, user->umid, user->userid);
+		entry->storage = NULL;
 	}
 
 	/*
@@ -216,6 +218,30 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 }
 
 /*
+ * Rerturns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+	bool		found;
+	ConnCacheEntry *entry;
+	ConnCacheKey key;
+
+	key = user->umid;
+	entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
+	Assert(found);
+
+	if (entry->storage == NULL)
+	{
+		entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+		memset(entry->storage, 0, initsize);
+	}
+
+	return entry->storage;
+}
+
+/*
  * Connect to remote server using specified server and user mapping properties.
  */
 static PGconn *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 4339bbf..2a0a662 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6512,7 +6512,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
 INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |  aa   
 ----------+-------
  a        | aaa
@@ -6540,7 +6540,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | aaa
@@ -6568,7 +6568,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | aaa
@@ -6596,7 +6596,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | newtoo
@@ -6662,35 +6662,40 @@ insert into bar2 values(3,33,33);
 insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+                                                   QUERY PLAN                                                    
+-----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+                     Sort Key: bar.f1
+                     ->  Seq Scan on public.bar
+                           Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
                ->  Foreign Scan on public.bar2
                      Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+         ->  Sort
                Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+(28 rows)
 
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
  f1 | f2 
 ----+----
   1 | 11
@@ -6700,35 +6705,40 @@ select * from bar where f1 in (select f1 from foo) for update;
 (4 rows)
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+                                                   QUERY PLAN                                                   
+----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+                     Sort Key: bar.f1
+                     ->  Seq Scan on public.bar
+                           Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
                ->  Foreign Scan on public.bar2
                      Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+         ->  Sort
                Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+(28 rows)
 
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
  f1 | f2 
 ----+----
   1 | 11
@@ -6758,11 +6768,11 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
    ->  Hash Join
          Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo.ctid, foo.*, foo.tableoid
          Inner Unique: true
@@ -6776,11 +6786,11 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
 (39 rows)
 
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
@@ -6811,16 +6821,16 @@ where bar.f1 = ss.f1;
          Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
          Hash Cond: (foo.f1 = bar.f1)
          ->  Append
-               ->  Seq Scan on public.foo
-                     Output: ROW(foo.f1), foo.f1
                ->  Foreign Scan on public.foo2
                      Output: ROW(foo2.f1), foo2.f1
                      Remote SQL: SELECT f1 FROM public.loct1
-               ->  Seq Scan on public.foo foo_1
-                     Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
                ->  Foreign Scan on public.foo2 foo2_1
                      Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
                      Remote SQL: SELECT f1 FROM public.loct1
+               ->  Seq Scan on public.foo
+                     Output: ROW(foo.f1), foo.f1
+               ->  Seq Scan on public.foo foo_1
+                     Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
          ->  Hash
                Output: bar.f1, bar.f2, bar.ctid
                ->  Seq Scan on public.bar
@@ -6838,16 +6848,16 @@ where bar.f1 = ss.f1;
                Output: (ROW(foo.f1)), foo.f1
                Sort Key: foo.f1
                ->  Append
-                     ->  Seq Scan on public.foo
-                           Output: ROW(foo.f1), foo.f1
                      ->  Foreign Scan on public.foo2
                            Output: ROW(foo2.f1), foo2.f1
                            Remote SQL: SELECT f1 FROM public.loct1
-                     ->  Seq Scan on public.foo foo_1
-                           Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
                      ->  Foreign Scan on public.foo2 foo2_1
                            Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
                            Remote SQL: SELECT f1 FROM public.loct1
+                     ->  Seq Scan on public.foo
+                           Output: ROW(foo.f1), foo.f1
+                     ->  Seq Scan on public.foo foo_1
+                           Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
 (45 rows)
 
 update bar set f2 = f2 + 100
@@ -6998,27 +7008,33 @@ delete from foo where f1 < 5 returning *;
 (5 rows)
 
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
- Update on public.bar
-   Output: bar.f1, bar.f2
-   Update on public.bar
-   Foreign Update on public.bar2
-   ->  Seq Scan on public.bar
-         Output: bar.f1, (bar.f2 + 100), bar.ctid
-   ->  Foreign Update on public.bar2
-         Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+                                      QUERY PLAN                                      
+--------------------------------------------------------------------------------------
+ Sort
+   Output: u.f1, u.f2
+   Sort Key: u.f1
+   CTE u
+     ->  Update on public.bar
+           Output: bar.f1, bar.f2
+           Update on public.bar
+           Foreign Update on public.bar2
+           ->  Seq Scan on public.bar
+                 Output: bar.f1, (bar.f2 + 100), bar.ctid
+           ->  Foreign Update on public.bar2
+                 Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+   ->  CTE Scan on u
+         Output: u.f1, u.f2
+(14 rows)
 
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
  f1 | f2  
 ----+-----
   1 | 311
   2 | 322
-  6 | 266
   3 | 333
   4 | 344
+  6 | 266
   7 | 277
 (6 rows)
 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index fb65e2e..0688504 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -20,6 +20,8 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
+#include "executor/execAsync.h"
+#include "executor/nodeForeignscan.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -34,6 +36,7 @@
 #include "optimizer/var.h"
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
+#include "pgstat.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
@@ -53,6 +56,9 @@ PG_MODULE_MAGIC;
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Retrive PgFdwScanState struct from ForeginScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -120,10 +126,27 @@ enum FdwDirectModifyPrivateIndex
 };
 
 /*
+ * Connection private area structure.
+ */
+typedef struct PgFdwConnpriv
+{
+	ForeignScanState *current_owner;	/* The node currently running a query
+										 * on this connection*/
+} PgFdwConnpriv;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConnpriv *connpriv;	/* connection private memory */
+} PgFdwState;
+
+/*
  * Execution state of a foreign scan using postgres_fdw.
  */
 typedef struct PgFdwScanState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table. NULL
 								 * for a foreign join scan. */
 	TupleDesc	tupdesc;		/* tuple descriptor of scan */
@@ -134,7 +157,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	bool		result_ready;
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -150,6 +173,13 @@ typedef struct PgFdwScanState
 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
 	bool		eof_reached;	/* true if last fetch reached EOF */
+	bool		run_async;		/* true if run asynchronously */
+	bool		async_waiting;	/* true if requesting the parent to wait */
+	ForeignScanState *waiter;	/* Next node to run a query among nodes
+								 * sharing the same connection */
+	ForeignScanState *last_waiter;	/* A waiting node at the end of a waiting
+								 * list. Maintained only by the current
+									 * owner of the connection */
 
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
@@ -163,11 +193,11 @@ typedef struct PgFdwScanState
  */
 typedef struct PgFdwModifyState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -190,6 +220,7 @@ typedef struct PgFdwModifyState
  */
 typedef struct PgFdwDirectModifyState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
@@ -288,6 +319,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
 static void postgresReScanForeignScan(ForeignScanState *node);
 static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
 static void postgresAddForeignUpdateTargets(Query *parsetree,
 								RangeTblEntry *target_rte,
 								Relation target_relation);
@@ -348,6 +380,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
 							 UpperRelationKind stage,
 							 RelOptInfo *input_rel,
 							 RelOptInfo *output_rel);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static bool postgresForeignAsyncConfigureWait(ForeignScanState *node,
+											  WaitEventSet *wes,
+											  void *caller_data, bool reinit);
 
 /*
  * Helper functions
@@ -368,7 +404,10 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn);
+static void absorb_current_result(ForeignScanState *node);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
@@ -438,6 +477,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->IterateForeignScan = postgresIterateForeignScan;
 	routine->ReScanForeignScan = postgresReScanForeignScan;
 	routine->EndForeignScan = postgresEndForeignScan;
+	routine->ShutdownForeignScan = postgresShutdownForeignScan;
 
 	/* Functions for updating foreign tables */
 	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -472,6 +512,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	/* Support functions for upper relation push-down */
 	routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
 
+	/* Support functions for async execution */
+	routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+	routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+
 	PG_RETURN_POINTER(routine);
 }
 
@@ -1322,12 +1366,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	fsstate->conn = GetConnection(user, false);
+	fsstate->s.conn = GetConnection(user, false);
+	fsstate->s.connpriv = (PgFdwConnpriv *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
+	fsstate->s.connpriv->current_owner = NULL;
+	fsstate->waiter = NULL;
+	fsstate->last_waiter = node;
 
 	/* Assign a unique ID for my cursor */
-	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+	fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
 	fsstate->cursor_exists = false;
 
+	/* Initialize async execution status */
+	fsstate->run_async = false;
+	fsstate->async_waiting = false;
+
 	/* Get private info created by planner functions. */
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
@@ -1383,32 +1436,136 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 static TupleTableSlot *
 postgresIterateForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 
 	/*
-	 * If this is the first call after Begin or ReScan, we need to create the
-	 * cursor on the remote side.
-	 */
-	if (!fsstate->cursor_exists)
-		create_cursor(node);
-
-	/*
 	 * Get some more tuples, if we've run out.
 	 */
 	if (fsstate->next_tuple >= fsstate->num_tuples)
 	{
-		/* No point in another fetch if we already detected EOF, though. */
-		if (!fsstate->eof_reached)
-			fetch_more_data(node);
-		/* If we didn't get any tuples, must be end of data. */
+		ForeignScanState *next_conn_owner = node;
+
+		/* This node has sent a query on this connection */
+		if (fsstate->s.connpriv->current_owner == node)
+		{
+			/* Check if the result is available */
+			if (PQisBusy(fsstate->s.conn))
+			{
+				int rc = WaitLatchOrSocket(NULL,
+										   WL_SOCKET_READABLE | WL_TIMEOUT,
+										   PQsocket(fsstate->s.conn), 0,
+										   WAIT_EVENT_ASYNC_WAIT);
+				if (node->fs_async && !(rc & WL_SOCKET_READABLE))
+				{
+					/*
+					 * This node is not ready yet. Tell the caller to wait.
+					 */
+					fsstate->result_ready = false;
+					node->ss.ps.asyncstate = AS_WAITING;
+					return ExecClearTuple(slot);
+				}
+			}
+
+			Assert(fsstate->async_waiting);
+			fsstate->async_waiting = false;
+			fetch_received_data(node);
+
+			/*
+			 * If someone is waiting this node on the same connection, let the
+			 * first waiter be the next owner of this connection.
+			 */
+			if (fsstate->waiter)
+			{
+				PgFdwScanState *next_owner_state;
+
+				next_conn_owner = fsstate->waiter;
+				next_owner_state = GetPgFdwScanState(next_conn_owner);
+				fsstate->waiter = NULL;
+
+				/*
+				 * only the current owner is responsible to maintain the shortcut
+				 * to the last waiter
+				 */
+				next_owner_state->last_waiter = fsstate->last_waiter;
+
+				/*
+				 * for simplicity, last_waiter points itself on a node that no one
+				 * is waiting for.
+				 */
+				fsstate->last_waiter = node;
+			}
+		}
+		else if (fsstate->s.connpriv->current_owner &&
+				 !GetPgFdwScanState(node)->eof_reached)
+		{
+			/*
+			 * Anyone else is holding this connection and we want this node to
+			 * run later. Add myself to the tail of the waiters' list then
+			 * return not-ready.  To avoid scanning through the waiters' list,
+			 * the current owner is to maintain the shortcut to the last
+			 * waiter.
+			 */
+			PgFdwScanState *conn_owner_state =
+				GetPgFdwScanState(fsstate->s.connpriv->current_owner);
+			ForeignScanState *last_waiter = conn_owner_state->last_waiter;
+			PgFdwScanState *last_waiter_state = GetPgFdwScanState(last_waiter);
+
+			last_waiter_state->waiter = node;
+			conn_owner_state->last_waiter = node;
+
+			/* Register the node to the async-waiting node list */
+			Assert(!GetPgFdwScanState(node)->async_waiting);
+
+			GetPgFdwScanState(node)->async_waiting = true;
+
+			fsstate->result_ready = fsstate->eof_reached;
+			node->ss.ps.asyncstate =
+				fsstate->result_ready ? AS_AVAILABLE : AS_WAITING;
+			return ExecClearTuple(slot);
+		}
+
+		/* At this time no node is running on the connection */
+		Assert(GetPgFdwScanState(next_conn_owner)->s.connpriv->current_owner
+			   == NULL);
+		/*
+		 * Send the next request for the next owner of this connection if
+		 * needed.
+		 */
+		if (!GetPgFdwScanState(next_conn_owner)->eof_reached)
+		{
+			PgFdwScanState *next_owner_state =
+				GetPgFdwScanState(next_conn_owner);
+
+			request_more_data(next_conn_owner);
+
+			/* Register the node to the async-waiting node list */
+			if (!next_owner_state->async_waiting)
+				next_owner_state->async_waiting = true;
+
+			if (!next_conn_owner->fs_async)
+				fetch_received_data(next_conn_owner);
+		}
+
+
+		/*
+		 * If we haven't received a result for the given node this time,
+		 * return with no tuple to give way to other nodes.
+		 */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
+		{
+			fsstate->result_ready = fsstate->eof_reached;
+			node->ss.ps.asyncstate =
+				fsstate->result_ready ? AS_AVAILABLE : AS_WAITING;
 			return ExecClearTuple(slot);
+		}
 	}
 
 	/*
 	 * Return the next tuple.
 	 */
+	fsstate->result_ready = true;
+	node->ss.ps.asyncstate = AS_AVAILABLE;
 	ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
 				   slot,
 				   InvalidBuffer,
@@ -1424,7 +1581,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 static void
 postgresReScanForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	char		sql[64];
 	PGresult   *res;
 
@@ -1432,6 +1589,9 @@ postgresReScanForeignScan(ForeignScanState *node)
 	if (!fsstate->cursor_exists)
 		return;
 
+	/* Absorb the ramining result */
+	absorb_current_result(node);
+
 	/*
 	 * If any internal parameters affecting this node have changed, we'd
 	 * better destroy and recreate the cursor.  Otherwise, rewinding it should
@@ -1460,9 +1620,9 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_exec_query(fsstate->conn, sql);
+	res = pgfdw_exec_query(fsstate->s.conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+		pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
 	PQclear(res);
 
 	/* Now force a fresh FETCH. */
@@ -1480,7 +1640,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 static void
 postgresEndForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 
 	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
 	if (fsstate == NULL)
@@ -1488,16 +1648,32 @@ postgresEndForeignScan(ForeignScanState *node)
 
 	/* Close the cursor if open, to prevent accumulation of cursors */
 	if (fsstate->cursor_exists)
-		close_cursor(fsstate->conn, fsstate->cursor_number);
+		close_cursor(fsstate->s.conn, fsstate->cursor_number);
 
 	/* Release remote connection */
-	ReleaseConnection(fsstate->conn);
-	fsstate->conn = NULL;
+	ReleaseConnection(fsstate->s.conn);
+	fsstate->s.conn = NULL;
 
 	/* MemoryContexts will be deleted automatically. */
 }
 
 /*
+ * postgresShutdownForeignScan
+ *		Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+	if (plan->operation != CMD_SELECT)
+		return;
+
+	/* Absorb the ramining result */
+	absorb_current_result(node);
+}
+
+/*
  * postgresAddForeignUpdateTargets
  *		Add resjunk column(s) needed for update/delete on a foreign table
  */
@@ -1700,7 +1876,9 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	user = GetUserMapping(userid, table->serverid);
 
 	/* Open connection; report that we'll create a prepared statement. */
-	fmstate->conn = GetConnection(user, true);
+	fmstate->s.conn = GetConnection(user, true);
+	fmstate->s.connpriv = (PgFdwConnpriv *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
 	fmstate->p_name = NULL;		/* prepared statement not made yet */
 
 	/* Deconstruct fdw_private data. */
@@ -1779,6 +1957,8 @@ postgresExecForeignInsert(EState *estate,
 	PGresult   *res;
 	int			n_rows;
 
+	vacate_connection((PgFdwState *)fmstate);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -1789,14 +1969,14 @@ postgresExecForeignInsert(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -1804,10 +1984,10 @@ postgresExecForeignInsert(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -1845,6 +2025,8 @@ postgresExecForeignUpdate(EState *estate,
 	PGresult   *res;
 	int			n_rows;
 
+	vacate_connection((PgFdwState *)fmstate);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -1865,14 +2047,14 @@ postgresExecForeignUpdate(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -1880,10 +2062,10 @@ postgresExecForeignUpdate(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -1921,6 +2103,8 @@ postgresExecForeignDelete(EState *estate,
 	PGresult   *res;
 	int			n_rows;
 
+	vacate_connection((PgFdwState *)fmstate);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -1941,14 +2125,14 @@ postgresExecForeignDelete(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -1956,10 +2140,10 @@ postgresExecForeignDelete(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -2006,16 +2190,16 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = pgfdw_exec_query(fmstate->conn, sql);
+		res = pgfdw_exec_query(fmstate->s.conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+			pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
 		PQclear(res);
 		fmstate->p_name = NULL;
 	}
 
 	/* Release remote connection */
-	ReleaseConnection(fmstate->conn);
-	fmstate->conn = NULL;
+	ReleaseConnection(fmstate->s.conn);
+	fmstate->s.conn = NULL;
 }
 
 /*
@@ -2303,7 +2487,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	dmstate->conn = GetConnection(user, false);
+	dmstate->s.conn = GetConnection(user, false);
+	dmstate->s.connpriv = (PgFdwConnpriv *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
 
 	/* Initialize state variable */
 	dmstate->num_tuples = -1;	/* -1 means not set yet */
@@ -2356,7 +2542,10 @@ postgresIterateDirectModify(ForeignScanState *node)
 	 * If this is the first call after Begin, execute the statement.
 	 */
 	if (dmstate->num_tuples == -1)
+	{
+		vacate_connection((PgFdwState *)dmstate);
 		execute_dml_stmt(node);
+	}
 
 	/*
 	 * If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2403,8 +2592,8 @@ postgresEndDirectModify(ForeignScanState *node)
 		PQclear(dmstate->result);
 
 	/* Release remote connection */
-	ReleaseConnection(dmstate->conn);
-	dmstate->conn = NULL;
+	ReleaseConnection(dmstate->s.conn);
+	dmstate->s.conn = NULL;
 
 	/* MemoryContext will be deleted automatically. */
 }
@@ -2523,6 +2712,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_param_join_conds;
 		StringInfoData sql;
 		PGconn	   *conn;
+		PgFdwConnpriv *connpriv;
 		Selectivity local_sel;
 		QualCost	local_cost;
 		List	   *fdw_scan_tlist = NIL;
@@ -2565,6 +2755,16 @@ estimate_path_cost_size(PlannerInfo *root,
 
 		/* Get the remote estimate */
 		conn = GetConnection(fpinfo->user, false);
+		connpriv = GetConnectionSpecificStorage(fpinfo->user,
+												sizeof(PgFdwConnpriv));
+		if (connpriv)
+		{
+			PgFdwState tmpstate;
+			tmpstate.conn = conn;
+			tmpstate.connpriv = connpriv;
+			vacate_connection(&tmpstate);
+		}
+
 		get_remote_estimate(sql.data, conn, &rows, &width,
 							&startup_cost, &total_cost);
 		ReleaseConnection(conn);
@@ -2919,11 +3119,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 static void
 create_cursor(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PGconn	   *conn = fsstate->s.conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -2989,47 +3189,96 @@ create_cursor(ForeignScanState *node)
  * Fetch some more rows from the node's cursor.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+	PGconn	   *conn = fsstate->s.conn;
+	char		sql[64];
+
+	/* The connection should be vacant */
+	Assert(fsstate->s.connpriv->current_owner == NULL);
+
+	/*
+	 * If this is the first call after Begin or ReScan, we need to create the
+	 * cursor on the remote side.
+	 */
+	if (!fsstate->cursor_exists)
+		create_cursor(node);
+
+	snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+			 fsstate->fetch_size, fsstate->cursor_number);
+
+	if (!PQsendQuery(conn, sql))
+		pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+	fsstate->s.connpriv->current_owner = node;
+}
+
+/*
+ * Fetch some more rows from the node's cursor.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
+{
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
+	/* I should be the current connection owner */
+	Assert(fsstate->s.connpriv->current_owner == node);
+
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
-	 * batch.
+	 * batch if no tuple is remaining
 	 */
-	fsstate->tuples = NULL;
-	MemoryContextReset(fsstate->batch_cxt);
+	if (fsstate->next_tuple >= fsstate->num_tuples)
+	{
+		fsstate->tuples = NULL;
+		fsstate->num_tuples = 0;
+		MemoryContextReset(fsstate->batch_cxt);
+	}
+	else if (fsstate->next_tuple > 0)
+	{
+		/* move the remaining tuples to the beginning of the store */
+		int n = 0;
+
+		while(fsstate->next_tuple < fsstate->num_tuples)
+			fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+		fsstate->num_tuples = n;
+	}
+
 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PGconn	   *conn = fsstate->s.conn;
 		char		sql[64];
-		int			numrows;
+		int			addrows;
+		size_t		newsize;
 		int			i;
 
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fsstate->fetch_size, fsstate->cursor_number);
 
-		res = pgfdw_exec_query(conn, sql);
+		res = pgfdw_get_result(conn, sql);
 		/* On error, report the original query, not the FETCH. */
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
 		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
+		addrows = PQntuples(res);
+		newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+		if (fsstate->tuples)
+			fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+		else
+			fsstate->tuples = (HeapTuple *) palloc(newsize);
 
-		for (i = 0; i < numrows; i++)
+		for (i = 0; i < addrows; i++)
 		{
 			Assert(IsA(node->ss.ps.plan, ForeignScan));
 
-			fsstate->tuples[i] =
+			fsstate->tuples[fsstate->num_tuples + i] =
 				make_tuple_from_result_row(res, i,
 										   fsstate->rel,
 										   fsstate->attinmeta,
@@ -3039,27 +3288,82 @@ fetch_more_data(ForeignScanState *node)
 		}
 
 		/* Update fetch_ct_2 */
-		if (fsstate->fetch_ct_2 < 2)
+		if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
 			fsstate->fetch_ct_2++;
 
+		fsstate->next_tuple = 0;
+		fsstate->num_tuples += addrows;
+
 		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fsstate->fetch_size);
+		fsstate->eof_reached = (addrows < fsstate->fetch_size);
 
 		PQclear(res);
 		res = NULL;
 	}
 	PG_CATCH();
 	{
+		fsstate->s.connpriv->current_owner = NULL;
 		if (res)
 			PQclear(res);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 
+	fsstate->s.connpriv->current_owner = NULL;
+
 	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
+ * Vacate a connection so that this node can send the next query
+ */
+static void
+vacate_connection(PgFdwState *fdwstate)
+{
+	PgFdwConnpriv *connpriv = fdwstate->connpriv;
+	ForeignScanState *owner;
+
+	if (connpriv == NULL || connpriv->current_owner == NULL)
+		return;
+
+	/*
+	 * let the current connection owner read the result for the running query
+	 */
+	owner = connpriv->current_owner;
+	fetch_received_data(owner);
+
+	/* Clear the waiting list */
+	while (owner)
+	{
+		PgFdwScanState *fsstate = GetPgFdwScanState(owner);
+
+		fsstate->last_waiter = NULL;
+		owner = fsstate->waiter;
+		fsstate->waiter = NULL;
+	}
+}
+
+/*
+ * Absorb the result of the current query.
+ */
+static void
+absorb_current_result(ForeignScanState *node)
+{
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+	ForeignScanState *owner = fsstate->s.connpriv->current_owner;
+
+	if (owner)
+	{
+		PgFdwScanState *target_state = GetPgFdwScanState(owner);
+		PGconn *conn = target_state->s.conn;
+
+		while(PQisBusy(conn))
+			PQclear(PQgetResult(conn));
+		fsstate->s.connpriv->current_owner = NULL;
+		fsstate->async_waiting = false;
+	}
+}
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -3143,7 +3447,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 
 	/* Construct name we'll use for the prepared statement. */
 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
-			 GetPrepStmtNumber(fmstate->conn));
+			 GetPrepStmtNumber(fmstate->s.conn));
 	p_name = pstrdup(prep_name);
 
 	/*
@@ -3153,12 +3457,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * the prepared statements we use in this module are simple enough that
 	 * the remote server will make the right choices.
 	 */
-	if (!PQsendPrepare(fmstate->conn,
+	if (!PQsendPrepare(fmstate->s.conn,
 					   p_name,
 					   fmstate->query,
 					   0,
 					   NULL))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -3166,9 +3470,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 	PQclear(res);
 
 	/* This action shows that the prepare has been done. */
@@ -3299,9 +3603,9 @@ execute_dml_stmt(ForeignScanState *node)
 	 * the desired result.  This allows us to avoid assuming that the remote
 	 * server has the same OIDs we do for the parameters' types.
 	 */
-	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+	if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
 						   NULL, values, NULL, NULL, 0))
-		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+		pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -3309,10 +3613,10 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+	dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+		pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
 						   dmstate->query);
 
 	/* Get the number of rows affected. */
@@ -4582,6 +4886,42 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
 	/* XXX Consider parameterized paths for the join relation */
 }
 
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+	return true;
+}
+
+
+/*
+ * Configure waiting event.
+ *
+ * Add an wait event only when the node is the connection owner. Elsewise
+ * another node on this connection is the owner.
+ */
+static bool
+postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+								  void *caller_data, bool reinit)
+{
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+	/* If the caller didn't reinit, this event is already in event set */
+	if (!reinit)
+		return true;
+
+	if (fsstate->s.connpriv->current_owner == node)
+	{
+		AddWaitEventToSet(wes,
+						  WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+						  NULL, caller_data);
+		return true;
+	}
+
+	return false;
+}
+
+
 /*
  * Assess whether the aggregation, grouping and having operations can be pushed
  * down to the foreign server.  As a side effect, save information we obtain in
@@ -4946,7 +5286,7 @@ make_tuple_from_result_row(PGresult *res,
 		PgFdwScanState *fdw_sstate;
 
 		Assert(fsstate);
-		fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
+		fdw_sstate = GetPgFdwScanState(fsstate);
 		tupdesc = fdw_sstate->tupdesc;
 	}
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 788b003..41ac1d2 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -77,6 +77,7 @@ typedef struct PgFdwRelationInfo
 	UserMapping *user;			/* only set in use_remote_estimate mode */
 
 	int			fetch_size;		/* fetch size for this remote table */
+	bool		allow_prefetch;	/* true to allow overlapped fetching  */
 
 	/*
 	 * Name of the relation while EXPLAINing ForeignScan. It is used for join
@@ -116,6 +117,7 @@ extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
 extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index ddfec79..56aae91 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1535,25 +1535,25 @@ INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE b SET aa = 'new';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'newtoo';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
@@ -1589,12 +1589,12 @@ insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
 
 -- Check UPDATE with inherited target and an inherited source table
 explain (verbose, costs off)
@@ -1653,8 +1653,8 @@ explain (verbose, costs off)
 delete from foo where f1 < 5 returning *;
 delete from foo where f1 < 5 returning *;
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
 
 drop table foo cascade;
 drop table bar cascade;
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to