Hello, thank you for looking this but sorry that the last patch
was buggy so that adaptive fetch size did not work.

 The attached is the fixed patch. It apparently improves the
performance for the test case shown in the previous mail, in
which the average tuple length is about 140 bytes.

21 Jan 2015 05:22:34 +0000, Matt Kelly <mkell...@gmail.com> wrote in 
<ca+kcukg4cvdlf4v0m9_rvv_zuasg1odhpj_yvczja6w2nsk...@mail.gmail.com>
> I'm trying to compare v5 and v6 in my laptop right now.  Apparently my
> laptop is quite a bit faster than your machine because the tests complete
> in roughly 3.3 seconds.
> 
> I added more data and didn't see anything other than noise.  (Then again
> the queries were dominated by the disk sort so I should retry with larger
> work_mem).  I'll try it again when I have more time to play with it.  I
> suspect the benefits would be more clear over a network.
> 
> Larger than default work_mem yes, but I think one of the prime use case for
> the fdw is for more warehouse style situations (PostgresXL style use
> cases).  In those cases, work_mem might reasonably be set to 1GB.  Then
> even if you have 10KB rows you can fetch a million rows and still be using
> less than work_mem.  A simpler change would be to vary it with respect to
> work_mem.

Agreed about the nature of the typical workload for postgres
FDW. But I think server itself including postgres_fdw should not
crash even by a sudden explosion of tuple length. The number 100
seems to be safe enough but 1000 seems suspicious, and 10000 is
looks to be danger from such standpoint.

> Half baked idea: I know its the wrong time in the execution phase, but if
> you are using remote estimates for cost there should also be a row width
> estimate which I believe is based from pg_statistic and its mean column
> width.

It reduces the chance to claim unexpected amount of memory, but
still the chance remains.

> Its actually a pity that there is no way to set fetch sizes based on "give
> me as many tuples as will fit in less than x amount of memory".  Because
> that is almost always exactly what you want.  Even when writing application
> code, I've never actually wanted precisely 10,000 rows; I've always wanted
> "a reasonable size chunk that could fit into memory" and then backed my way
> into how many rows I wanted.  If we were to extend FETCH to support syntax
> like: FETCH FORWARD '10MB' FROM ...; then we would eliminate the need
> estimate the value on the fly.

I didn't think about hat. It makes sense, at least to me:) There
would be many cases that the *amount* of data is more crucial
than their number. I'll work on it.

> The async stuff, however, is a huge improvement over the last time I played
> with the fdw.  The two active postgres processes were easily consuming a
> core and half of CPU.  I think its not worth tying these two things
> together.  Its probably worth it to make these two separate discussions and
> separate patches.

Yes, they can be separated and also should be. I'll split them
after this.

> - Matt Kelly
> 
> *Just sanity checking myself: Shutting down the server, applying the
> different patch, 'make clean install' in postgres_fdw, and then restarting
> the server should obviously be sufficient to make sure its running the new
> code because that is all linked at runtime, right?

Yes. it's enough and I also did so. This patch touches only
postgres_fdw.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 12d0e8666871e2272beb1b85903baf0be92881b5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Tue, 13 Jan 2015 19:20:35 +0900
Subject: [PATCH] Asynchronous execution of postgres_fdw v7

This is the buf fixed version of v6, which was modified version of
Asynchronous execution of postgres_fdw.
---
 contrib/postgres_fdw/Makefile       |   2 +-
 contrib/postgres_fdw/PgFdwConn.c    | 200 ++++++++++++++++++
 contrib/postgres_fdw/PgFdwConn.h    |  61 ++++++
 contrib/postgres_fdw/connection.c   |  82 ++++----
 contrib/postgres_fdw/postgres_fdw.c | 391 +++++++++++++++++++++++++++++-------
 contrib/postgres_fdw/postgres_fdw.h |  15 +-
 6 files changed, 629 insertions(+), 122 deletions(-)
 create mode 100644 contrib/postgres_fdw/PgFdwConn.c
 create mode 100644 contrib/postgres_fdw/PgFdwConn.h

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index d2b98e1..d0913e2 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -1,7 +1,7 @@
 # contrib/postgres_fdw/Makefile
 
 MODULE_big = postgres_fdw
-OBJS = postgres_fdw.o option.o deparse.o connection.o $(WIN32RES)
+OBJS = postgres_fdw.o PgFdwConn.o option.o deparse.o connection.o $(WIN32RES)
 PGFILEDESC = "postgres_fdw - foreign data wrapper for PostgreSQL"
 
 PG_CPPFLAGS = -I$(libpq_srcdir)
diff --git a/contrib/postgres_fdw/PgFdwConn.c b/contrib/postgres_fdw/PgFdwConn.c
new file mode 100644
index 0000000..b13b597
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.c
@@ -0,0 +1,200 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.c
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "PgFdwConn.h"
+
+#define PFC_ALLOCATE()	((PgFdwConn *)malloc(sizeof(PgFdwConn)))
+#define PFC_FREE(c)		free(c)
+
+struct pgfdw_conn
+{
+	PGconn *pgconn;				/* libpq connection for this connection */
+	int		nscans;				/* number of scans using this connection */
+	struct PgFdwScanState *async_scan; /* the connection currently running
+										* async query on this connection  */
+};
+
+void
+PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan)
+{
+	conn->async_scan = scan;
+}
+
+struct PgFdwScanState *
+PFCgetAsyncScan(PgFdwConn *conn)
+{
+	return conn->async_scan;
+}
+
+int
+PFCisAsyncRunning(PgFdwConn *conn)
+{
+	return conn->async_scan != NULL;
+}
+
+PGconn *
+PFCgetPGconn(PgFdwConn *conn)
+{
+	return conn->pgconn;
+}
+
+int
+PFCgetNscans(PgFdwConn *conn)
+{
+	return conn->nscans;
+}
+
+int
+PFCincrementNscans(PgFdwConn *conn)
+{
+	return ++conn->nscans;
+}
+
+int
+PFCdecrementNscans(PgFdwConn *conn)
+{
+	Assert(conn->nscans > 0);
+	return --conn->nscans;
+}
+
+void
+PFCcancelAsync(PgFdwConn *conn)
+{
+	if (PFCisAsyncRunning(conn))
+		PFCconsumeInput(conn);
+}
+
+void
+PFCinit(PgFdwConn *conn)
+{
+	conn->async_scan = NULL;
+	conn->nscans = 0;
+}
+
+int
+PFCsendQuery(PgFdwConn *conn, const char *query)
+{
+	return PQsendQuery(conn->pgconn, query);
+}
+
+PGresult *
+PFCexec(PgFdwConn *conn, const char *query)
+{
+	return PQexec(conn->pgconn, query);
+}
+
+PGresult *
+PFCexecParams(PgFdwConn *conn,
+			  const char *command,
+			  int nParams,
+			  const Oid *paramTypes,
+			  const char *const * paramValues,
+			  const int *paramLengths,
+			  const int *paramFormats,
+			  int resultFormat)
+{
+	return PQexecParams(conn->pgconn,
+						command, nParams, paramTypes, paramValues,
+						paramLengths, paramFormats, resultFormat);
+}
+
+PGresult *
+PFCprepare(PgFdwConn *conn,
+		   const char *stmtName, const char *query,
+		   int nParams, const Oid *paramTypes)
+{
+	return PQprepare(conn->pgconn, stmtName, query, nParams, paramTypes);
+}
+
+PGresult *
+PFCexecPrepared(PgFdwConn *conn,
+				const char *stmtName,
+				int nParams,
+				const char *const * paramValues,
+				const int *paramLengths,
+				const int *paramFormats,
+				int resultFormat)
+{
+	return PQexecPrepared(conn->pgconn, 
+						  stmtName, nParams, paramValues, paramLengths,
+						  paramFormats, resultFormat);
+}
+
+PGresult *
+PFCgetResult(PgFdwConn *conn)
+{
+	return PQgetResult(conn->pgconn);
+}
+
+int
+PFCconsumeInput(PgFdwConn *conn)
+{
+	return PQconsumeInput(conn->pgconn);
+}
+
+int
+PFCisBusy(PgFdwConn *conn)
+{
+	return PQisBusy(conn->pgconn);
+}
+
+ConnStatusType
+PFCstatus(const PgFdwConn *conn)
+{
+	return PQstatus(conn->pgconn);
+}
+
+PGTransactionStatusType
+PFCtransactionStatus(const PgFdwConn *conn)
+{
+	return PQtransactionStatus(conn->pgconn);
+}
+
+int
+PFCserverVersion(const PgFdwConn *conn)
+{
+	return PQserverVersion(conn->pgconn);
+}
+
+char *
+PFCerrorMessage(const PgFdwConn *conn)
+{
+	return PQerrorMessage(conn->pgconn);
+}
+
+int
+PFCconnectionUsedPassword(const PgFdwConn *conn)
+{
+	return PQconnectionUsedPassword(conn->pgconn);
+}
+
+void
+PFCfinish(PgFdwConn *conn)
+{
+	return PQfinish(conn->pgconn);
+	PFC_FREE(conn);
+}
+
+PgFdwConn *
+PFCconnectdbParams(const char *const * keywords,
+				   const char *const * values, int expand_dbname)
+{
+	PgFdwConn *ret = PFC_ALLOCATE();
+
+	PFCinit(ret);
+	ret->pgconn = PQconnectdbParams(keywords, values, expand_dbname);
+
+	return ret;
+}
diff --git a/contrib/postgres_fdw/PgFdwConn.h b/contrib/postgres_fdw/PgFdwConn.h
new file mode 100644
index 0000000..f695f5a
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.h
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.h
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PGFDWCONN_H
+#define PGFDWCONN_H
+
+#include "libpq-fe.h"
+
+typedef struct pgfdw_conn PgFdwConn;
+struct PgFdwScanState;
+
+extern void PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan);
+extern struct PgFdwScanState *PFCgetAsyncScan(PgFdwConn *conn);
+extern int PFCisAsyncRunning(PgFdwConn *conn);
+extern PGconn *PFCgetPGconn(PgFdwConn *conn);
+extern int PFCgetNscans(PgFdwConn *conn);
+extern int PFCincrementNscans(PgFdwConn *conn);
+extern int PFCdecrementNscans(PgFdwConn *conn);
+extern void PFCcancelAsync(PgFdwConn *conn);
+extern void PFCinit(PgFdwConn *conn);
+extern int PFCsendQuery(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexec(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexecParams(PgFdwConn *conn,
+								const char *command,
+								int nParams,
+								const Oid *paramTypes,
+								const char *const * paramValues,
+								const int *paramLengths,
+								const int *paramFormats,
+								int resultFormat);
+extern PGresult *PFCprepare(PgFdwConn *conn,
+							const char *stmtName, const char *query,
+							int nParams, const Oid *paramTypes);
+extern PGresult *PFCexecPrepared(PgFdwConn *conn,
+								 const char *stmtName,
+								 int nParams,
+								 const char *const * paramValues,
+								 const int *paramLengths,
+								 const int *paramFormats,
+								 int resultFormat);
+extern PGresult *PFCgetResult(PgFdwConn *conn);
+extern int PFCconsumeInput(PgFdwConn *conn);
+extern int PFCisBusy(PgFdwConn *conn);
+extern ConnStatusType PFCstatus(const PgFdwConn *conn);
+extern PGTransactionStatusType PFCtransactionStatus(const PgFdwConn *conn);
+extern int PFCserverVersion(const PgFdwConn *conn);
+extern char *PFCerrorMessage(const PgFdwConn *conn);
+extern int PFCconnectionUsedPassword(const PgFdwConn *conn);
+extern void PFCfinish(PgFdwConn *conn);
+extern PgFdwConn *PFCconnectdbParams(const char *const * keywords,
+			 const char *const * values, int expand_dbname);
+#endif   /* PGFDWCONN_H */
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4e02cb2..2517f6b 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,7 +44,7 @@ typedef struct ConnCacheKey
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
-	PGconn	   *conn;			/* connection to foreign server, or NULL */
+	PgFdwConn	*conn;			/* connection to foreign server, or NULL */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
@@ -64,10 +64,10 @@ static unsigned int prep_stmt_number = 0;
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static PgFdwConn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
-static void configure_remote_session(PGconn *conn);
-static void do_sql_command(PGconn *conn, const char *sql);
+static void configure_remote_session(PgFdwConn *conn);
+static void do_sql_command(PgFdwConn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
@@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  * be useful and not mere pedantry.  We could not flush any active connections
  * mid-transaction anyway.
  */
-PGconn *
+PgFdwConn *
 GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt)
 {
@@ -161,9 +161,12 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		entry->have_error = false;
 		entry->conn = connect_pg_server(server, user);
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
-			 entry->conn, server->servername);
+			 PFCgetPGconn(entry->conn), server->servername);
+
 	}
 
+	PFCincrementNscans(entry->conn);
+
 	/*
 	 * Start a new transaction or subtransaction if needed.
 	 */
@@ -178,10 +181,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
-static PGconn *
+static PgFdwConn *
 connect_pg_server(ForeignServer *server, UserMapping *user)
 {
-	PGconn	   *volatile conn = NULL;
+	PgFdwConn   *volatile conn = NULL;
 
 	/*
 	 * Use PG_TRY block to ensure closing connection on error.
@@ -223,14 +226,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		/* verify connection parameters and make connection */
 		check_conn_params(keywords, values);
 
-		conn = PQconnectdbParams(keywords, values, false);
-		if (!conn || PQstatus(conn) != CONNECTION_OK)
+		conn = PFCconnectdbParams(keywords, values, false);
+		if (!conn || PFCstatus(conn) != CONNECTION_OK)
 		{
 			char	   *connmessage;
 			int			msglen;
 
 			/* libpq typically appends a newline, strip that */
-			connmessage = pstrdup(PQerrorMessage(conn));
+			connmessage = pstrdup(PFCerrorMessage(conn));
 			msglen = strlen(connmessage);
 			if (msglen > 0 && connmessage[msglen - 1] == '\n')
 				connmessage[msglen - 1] = '\0';
@@ -246,7 +249,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		 * otherwise, he's piggybacking on the postgres server's user
 		 * identity. See also dblink_security_check() in contrib/dblink.
 		 */
-		if (!superuser() && !PQconnectionUsedPassword(conn))
+		if (!superuser() && !PFCconnectionUsedPassword(conn))
 			ereport(ERROR,
 				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
 				   errmsg("password is required"),
@@ -263,7 +266,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 	{
 		/* Release PGconn data structure if we managed to create one */
 		if (conn)
-			PQfinish(conn);
+			PFCfinish(conn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
@@ -312,9 +315,9 @@ check_conn_params(const char **keywords, const char **values)
  * there are any number of ways to break things.
  */
 static void
-configure_remote_session(PGconn *conn)
+configure_remote_session(PgFdwConn *conn)
 {
-	int			remoteversion = PQserverVersion(conn);
+	int			remoteversion = PFCserverVersion(conn);
 
 	/* Force the search path to contain only pg_catalog (see deparse.c) */
 	do_sql_command(conn, "SET search_path = pg_catalog");
@@ -348,11 +351,11 @@ configure_remote_session(PGconn *conn)
  * Convenience subroutine to issue a non-data-returning SQL command to remote
  */
 static void
-do_sql_command(PGconn *conn, const char *sql)
+do_sql_command(PgFdwConn *conn, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -379,7 +382,7 @@ begin_remote_xact(ConnCacheEntry *entry)
 		const char *sql;
 
 		elog(DEBUG3, "starting remote transaction on connection %p",
-			 entry->conn);
+			 PFCgetPGconn(entry->conn));
 
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
@@ -408,13 +411,11 @@ begin_remote_xact(ConnCacheEntry *entry)
  * Release connection reference count created by calling GetConnection.
  */
 void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(PgFdwConn *conn)
 {
-	/*
-	 * Currently, we don't actually track connection references because all
-	 * cleanup is managed on a transaction or subtransaction basis instead. So
-	 * there's nothing to do here.
-	 */
+	/* ongoing async query should be canceled if no scans left */
+	if (PFCdecrementNscans(conn) == 0)
+		finish_async_query(conn);
 }
 
 /*
@@ -429,7 +430,7 @@ ReleaseConnection(PGconn *conn)
  * collisions are highly improbable; just be sure to use %u not %d to print.
  */
 unsigned int
-GetCursorNumber(PGconn *conn)
+GetCursorNumber(PgFdwConn *conn)
 {
 	return ++cursor_number;
 }
@@ -443,7 +444,7 @@ GetCursorNumber(PGconn *conn)
  * increasing the risk of prepared-statement name collisions by resetting.
  */
 unsigned int
-GetPrepStmtNumber(PGconn *conn)
+GetPrepStmtNumber(PgFdwConn *conn)
 {
 	return ++prep_stmt_number;
 }
@@ -462,7 +463,7 @@ GetPrepStmtNumber(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql)
 {
 	/* If requested, PGresult must be released before leaving this function. */
@@ -490,7 +491,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		 * return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
-			message_primary = PQerrorMessage(conn);
+			message_primary = PFCerrorMessage(conn);
 
 		ereport(elevel,
 				(errcode(sqlstate),
@@ -542,7 +543,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		if (entry->xact_depth > 0)
 		{
 			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+				 PFCgetPGconn(entry->conn));
 
 			switch (event)
 			{
@@ -567,7 +568,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = PFCexec(entry->conn, "DEALLOCATE ALL");
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
@@ -597,7 +598,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
+					res = PFCexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
 						pgfdw_report_error(WARNING, res, entry->conn, true,
@@ -608,7 +609,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						/* As above, make sure to clear any prepared stmts */
 						if (entry->have_prep_stmt && entry->have_error)
 						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
+							res = PFCexec(entry->conn, "DEALLOCATE ALL");
 							PQclear(res);
 						}
 						entry->have_prep_stmt = false;
@@ -620,17 +621,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 		/* Reset state to show we're out of a transaction */
 		entry->xact_depth = 0;
+		PFCcancelAsync(entry->conn);
+		PFCinit(entry->conn);
 
 		/*
 		 * If the connection isn't in a good idle state, discard it to
 		 * recover. Next GetConnection will open a new connection.
 		 */
-		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+		if (PFCstatus(entry->conn) != CONNECTION_OK ||
+			PFCtransactionStatus(entry->conn) != PQTRANS_IDLE)
 		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
-			PQfinish(entry->conn);
-			entry->conn = NULL;
+			elog(DEBUG3, "discarding connection %p",
+				 PFCgetPGconn(entry->conn));
+			PFCfinish(entry->conn);
 		}
 	}
 
@@ -676,6 +679,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		PGresult   *res;
 		char		sql[100];
 
+		/* Shut down asynchronous scan if running */
+		PFCcancelAsync(entry->conn);
+
 		/*
 		 * We only care about connections with open remote subtransactions of
 		 * the current level.
@@ -701,7 +707,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			snprintf(sql, sizeof(sql),
 					 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
 					 curlevel, curlevel);
-			res = PQexec(entry->conn, sql);
+			res = PFCexec(entry->conn, sql);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
 			else
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d76e739..a146a2b 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -46,6 +46,27 @@ PG_MODULE_MAGIC;
 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
 #define DEFAULT_FDW_TUPLE_COST		0.01
 
+/* Fetch size at startup. This might be better be a GUC parameter */
+#define MIN_FETCH_SIZE 100
+
+/* Maximum fetch size. This might be better be a GUC parameter */
+#define MAX_FETCH_SIZE 1000
+
+/*
+ * Maximum size for fetch buffer in kilobytes. Ditto.
+ *
+ * This should be far larger than sizeof(HeapTuple) * FETCH_SIZE_MAX. This is
+ * not a hard limit because we cannot know in advance the average row length
+ * returned.
+ */
+#define MAX_FETCH_BUFFER_SIZE 10000	/* 10MB */
+
+/* Maximum duration allowed for a single fetch, in milliseconds */
+#define MAX_FETCH_DURATION 500
+
+/* Number of successive async fetches to enlarge fetch_size */
+#define INCREASE_FETCH_SIZE_THRESHOLD 8
+
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
@@ -123,6 +144,12 @@ enum FdwModifyPrivateIndex
 	FdwModifyPrivateRetrievedAttrs
 };
 
+typedef enum fetch_mode {
+	START_ONLY,
+	FORCE_SYNC,
+	ALLOW_ASYNC
+} fetch_mode;
+
 /*
  * Execution state of a foreign scan using postgres_fdw.
  */
@@ -136,7 +163,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -148,7 +175,12 @@ typedef struct PgFdwScanState
 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
 	int			num_tuples;		/* # of tuples in array */
 	int			next_tuple;		/* index of next one to return */
-
+	int			fetch_size;		/* rows to be fetched at once */
+	int			successive_async; /* # of successive fetches at this
+                                    fetch_size */
+	long		last_fetch_req_at;  /* The time of the last fetch request, in
+									 * milliseconds*/
+	int			last_buf_size;	/* Buffer size required for the last fetch */
 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
 	bool		eof_reached;	/* true if last fetch reached EOF */
@@ -156,6 +188,7 @@ typedef struct PgFdwScanState
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+	ExprContext	 *econtext;		/* copy of ps_ExprContext of ForeignScanState */
 } PgFdwScanState;
 
 /*
@@ -167,7 +200,7 @@ typedef struct PgFdwModifyState
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -298,7 +331,7 @@ static void estimate_path_cost_size(PlannerInfo *root,
 						double *p_rows, int *p_width,
 						Cost *p_startup_cost, Cost *p_total_cost);
 static void get_remote_estimate(const char *sql,
-					PGconn *conn,
+					PgFdwConn *conn,
 					double *rows,
 					int *width,
 					Cost *startup_cost,
@@ -306,9 +339,9 @@ static void get_remote_estimate(const char *sql,
 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
-static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void create_cursor(PgFdwScanState *node);
+static void close_cursor(PgFdwConn *conn, unsigned int cursor_number);
+static void fetch_more_data(PgFdwScanState *node, fetch_mode cmd);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
@@ -329,7 +362,6 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
 
-
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
  * to my callback routines.
@@ -982,6 +1014,19 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
 	else
 		fsstate->param_values = NULL;
+
+	fsstate->econtext = node->ss.ps.ps_ExprContext;
+
+	fsstate->fetch_size = MIN_FETCH_SIZE;
+	fsstate->successive_async = 0;
+	fsstate->last_buf_size = 0;
+
+	/*
+	 * Start scanning asynchronously if it is the first scan on this
+	 * connection.
+	 */
+	if (PFCgetNscans(fsstate->conn) == 1)
+		create_cursor(fsstate);
 }
 
 /*
@@ -1000,7 +1045,10 @@ postgresIterateForeignScan(ForeignScanState *node)
 	 * cursor on the remote side.
 	 */
 	if (!fsstate->cursor_exists)
-		create_cursor(node);
+	{
+		finish_async_query(fsstate->conn);
+		create_cursor(fsstate);
+	}
 
 	/*
 	 * Get some more tuples, if we've run out.
@@ -1009,7 +1057,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	{
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(fsstate, ALLOW_ASYNC);
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
 			return ExecClearTuple(slot);
@@ -1069,7 +1117,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = PFCexec(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1392,19 +1440,22 @@ postgresExecForeignInsert(EState *estate,
 	/* Convert parameters needed by prepared statement to text form */
 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * Execute the prepared statement, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1462,19 +1513,22 @@ postgresExecForeignUpdate(EState *estate,
 										(ItemPointer) DatumGetPointer(datum),
 										slot);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * Execute the prepared statement, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1532,19 +1586,22 @@ postgresExecForeignDelete(EState *estate,
 										(ItemPointer) DatumGetPointer(datum),
 										NULL);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * Execute the prepared statement, and check for success.
 	 *
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1594,7 +1651,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = PFCexec(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -1726,7 +1783,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_join_conds;
 		StringInfoData sql;
 		List	   *retrieved_attrs;
-		PGconn	   *conn;
+		PgFdwConn  *conn;
 		Selectivity local_sel;
 		QualCost	local_cost;
 
@@ -1836,7 +1893,7 @@ estimate_path_cost_size(PlannerInfo *root,
  * The given "sql" must be an EXPLAIN command.
  */
 static void
-get_remote_estimate(const char *sql, PGconn *conn,
+get_remote_estimate(const char *sql, PgFdwConn *conn,
 					double *rows, int *width,
 					Cost *startup_cost, Cost *total_cost)
 {
@@ -1852,7 +1909,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = PFCexec(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -1917,13 +1974,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  * Create cursor for node's query with current parameter values.
  */
 static void
-create_cursor(ForeignScanState *node)
+create_cursor(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
-	ExprContext *econtext = node->ss.ps.ps_ExprContext;
+	ExprContext *econtext = fsstate->econtext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PgFdwConn	*conn = fsstate->conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -1985,8 +2041,8 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	res = PFCexecParams(conn, buf.data, numParams, NULL, values,
+						 NULL, NULL, 0);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2001,71 +2057,215 @@ create_cursor(ForeignScanState *node)
 
 	/* Clean up */
 	pfree(buf.data);
+
+	/*
+	 * Start async scan if this is the first scan. See fetch_more_data() for
+	 * details
+	 */
+	if (PFCgetNscans(conn) == 1)
+		fetch_more_data(fsstate, START_ONLY);
 }
 
 /*
  * Fetch some more rows from the node's cursor.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(PgFdwScanState *fsstate, fetch_mode cmd)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
-	 * batch.
+	 * batch. Some tuples left unread when asynchronous fetching is
+	 * interrupted. Don't flush to preserve the unread tuples for the case. It
+	 * occurs no more than twice successively.
 	 */
-	fsstate->tuples = NULL;
-	MemoryContextReset(fsstate->batch_cxt);
+	if (fsstate->next_tuple >= fsstate->num_tuples)
+	{
+		fsstate->tuples = NULL;
+		MemoryContextReset(fsstate->batch_cxt);
+	}
 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
-		int			fetch_size;
-		int			numrows;
+		int			numrows, addrows, restrows;
+		HeapTuple  *tmptuples;
+		int			prev_fetch_size = fsstate->fetch_size;
+		int 		new_fetch_size = fsstate->fetch_size;
 		int			i;
+		struct timeval tv = {0, 0};
+		long		current_time;
+		int			fetch_buf_size;
 
-		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = 100;
+		gettimeofday(&tv, NULL);
+		current_time = tv.tv_sec * 1000 + tv.tv_usec / 1000;
 
+		/*
+		 * Doing adaptive fetch size
+		 *
+		 * Since we don't have enough knowledge about how long fetching takes
+		 * or how large space needed for received tuples in advance, change
+		 * fetch_size dynamically according to maximal allowed duration and
+		 * buffer space.
+		 */
+		if (fsstate->last_buf_size > MAX_FETCH_BUFFER_SIZE)
+		{
+			new_fetch_size =
+				(int)((double)fsstate->fetch_size * MAX_FETCH_BUFFER_SIZE /
+					  fsstate->last_buf_size);
+		}
+		if (PFCisBusy(conn) &&
+			fsstate->fetch_size > MIN_FETCH_SIZE &&
+			fsstate->last_fetch_req_at + MAX_FETCH_DURATION <
+			current_time)
+		{
+			int tmp_fetch_size = fsstate->fetch_size / 2;
+			if (tmp_fetch_size < new_fetch_size)
+				new_fetch_size = tmp_fetch_size;
+		}
+
+		/* Increase if not decreased and other conditions match. */
+		if (new_fetch_size == fsstate->fetch_size &&
+			fsstate->successive_async >= INCREASE_FETCH_SIZE_THRESHOLD &&
+			fsstate->fetch_size < MAX_FETCH_SIZE)
+			new_fetch_size *= 2;
+
+		/* Change fetch_size as calculated above */
+		if (new_fetch_size != fsstate->fetch_size)
+		{
+			if (new_fetch_size > MAX_FETCH_SIZE)
+				fsstate->fetch_size = MAX_FETCH_SIZE;
+			else if (new_fetch_size < MIN_FETCH_SIZE)
+				fsstate->fetch_size = MIN_FETCH_SIZE;
+			else
+				fsstate->fetch_size = new_fetch_size;
+			fsstate->successive_async = 0;
+		}
+
+		/* Make the query to fetch tuples */
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-				 fetch_size, fsstate->cursor_number);
+				 fsstate->fetch_size, fsstate->cursor_number);
+
+		if (PFCisAsyncRunning(conn))
+		{
+			Assert (cmd != START_ONLY);
+
+			/*
+			 * If the target fsstate is different from the scan state that the
+			 * current async fetch running for, the result should be stored
+			 * into it, then synchronously fetch data for the target fsstate.
+			 */
+			if (fsstate != PFCgetAsyncScan(conn))
+			{
+				fetch_more_data(PFCgetAsyncScan(conn), FORCE_SYNC);
+				res = PFCexec(conn, sql);
+			}
+			else
+			{
+				/* Get result of running async fetch */
+				res = PFCgetResult(conn);
+				if (PQntuples(res) == prev_fetch_size)
+				{
+					/*
+					 * Connection state doesn't go to IDLE even if all data
+					 * has been sent to client for asynchronous query. One
+					 * more PQgetResult() is needed to reset the state to
+					 * IDLE.  See PQexecFinish() for details.
+					 */
+					if (PFCgetResult(conn) != NULL)
+						elog(ERROR, "Connection status error.");
+				}
+			}
+			PFCsetAsyncScan(conn, NULL);
+		}
+		else
+		{
+			if (cmd == START_ONLY)
+			{
+				Assert(PFCgetNscans(conn) == 1);
+
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false,
+									   fsstate->query);
+				fsstate->last_fetch_req_at = current_time;
+
+				PFCsetAsyncScan(conn, fsstate);
+				goto end_of_fetch;
+			}
+
+			/* Elsewise do synchronous query execution */
+			PFCsetAsyncScan(conn, NULL);
+			res = PFCexec(conn, sql);
+		}
 
-		res = PQexec(conn, sql);
 		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		if (res &&  PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
-		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+		/* allocate tuple storage */
+		tmptuples = fsstate->tuples;
+		addrows = PQntuples(res);
+		restrows = fsstate->num_tuples - fsstate->next_tuple;
+		numrows = restrows + addrows;
+		fetch_buf_size = numrows * sizeof(HeapTuple);
+		fsstate->tuples = (HeapTuple *) palloc0(fetch_buf_size);
+
+		Assert(restrows == 0 || tmptuples);
+
+		/* copy unread tuples if any */
+		for (i = 0 ; i < restrows ; i++)
+			fsstate->tuples[i] = tmptuples[fsstate->next_tuple + i];
+
 		fsstate->num_tuples = numrows;
 		fsstate->next_tuple = 0;
 
-		for (i = 0; i < numrows; i++)
+		/* Convert the data into HeapTuples */
+		for (i = 0 ; i < addrows; i++)
 		{
-			fsstate->tuples[i] =
+			HeapTuple tup =
 				make_tuple_from_result_row(res, i,
 										   fsstate->rel,
 										   fsstate->attinmeta,
 										   fsstate->retrieved_attrs,
 										   fsstate->temp_cxt);
+			fsstate->tuples[restrows + i] = tup;
+			fetch_buf_size += (HEAPTUPLESIZE + tup->t_len);
 		}
 
+		fsstate->last_buf_size = fetch_buf_size / 1024; /* in kilobytes */
+
 		/* Update fetch_ct_2 */
 		if (fsstate->fetch_ct_2 < 2)
 			fsstate->fetch_ct_2++;
 
 		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fetch_size);
+		fsstate->eof_reached = (numrows < prev_fetch_size);
 
 		PQclear(res);
 		res = NULL;
+
+		if (cmd == ALLOW_ASYNC)
+		{
+			if (!fsstate->eof_reached)
+			{
+				/*
+				 * We can immediately request the next bunch of tuples if
+				 * we're on asynchronous connection.
+				 */
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+				fsstate->last_fetch_req_at = current_time;
+				PFCsetAsyncScan(conn, fsstate);
+			}
+		}
+
+end_of_fetch:
+		;	/* Nothing to do here but needed to make compiler quiet. */
 	}
 	PG_CATCH();
 	{
@@ -2075,10 +2275,44 @@ fetch_more_data(ForeignScanState *node)
 	}
 	PG_END_TRY();
 
+	if (PFCisAsyncRunning(fsstate->conn))
+	{
+		if (fsstate->successive_async < INCREASE_FETCH_SIZE_THRESHOLD)
+			fsstate->successive_async++;
+	}
+	else
+	{
+		/* Reset fetch_size if the async_fetch stopped */
+		fsstate->successive_async = 0;
+		fsstate->fetch_size = MIN_FETCH_SIZE;
+	}
+
 	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
+ * Force cancelling async command state.
+ */
+void
+finish_async_query(PgFdwConn *conn)
+{
+	PgFdwScanState *fsstate = PFCgetAsyncScan(conn);
+	PgFdwConn *async_conn;
+
+	/* Nothing to do if no async connection */
+	if (fsstate == NULL) return;
+	async_conn = fsstate->conn;
+	if (!async_conn ||
+		PFCgetNscans(async_conn) == 1 ||
+		!PFCisAsyncRunning(async_conn))
+		return;
+
+	fetch_more_data(PFCgetAsyncScan(async_conn), FORCE_SYNC);
+
+	Assert(!PFCisAsyncRunning(async_conn));
+}
+
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -2132,7 +2366,7 @@ reset_transmission_modes(int nestlevel)
  * Utility routine to close a cursor.
  */
 static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PgFdwConn *conn, unsigned int cursor_number)
 {
 	char		sql[64];
 	PGresult   *res;
@@ -2143,7 +2377,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -2165,6 +2399,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 			 GetPrepStmtNumber(fmstate->conn));
 	p_name = pstrdup(prep_name);
 
+	/* Finish async query if runing */
+	finish_async_query(fmstate->conn);
+
 	/*
 	 * We intentionally do not specify parameter types here, but leave the
 	 * remote server to derive them by default.  This avoids possible problems
@@ -2175,11 +2412,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
-					p_name,
-					fmstate->query,
-					0,
-					NULL);
+	res = PFCprepare(fmstate->conn,
+					 p_name,
+					 fmstate->query,
+					 0,
+					 NULL);
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -2297,7 +2534,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
 
@@ -2329,7 +2566,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -2379,7 +2616,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	unsigned int cursor_number;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
@@ -2423,7 +2660,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -2453,7 +2690,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = PFCexec(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2582,7 +2819,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	bool		import_not_null = true;
 	ForeignServer *server;
 	UserMapping *mapping;
-	PGconn	   *conn;
+	PgFdwConn   *conn;
 	StringInfoData buf;
 	PGresult   *volatile res = NULL;
 	int			numrows,
@@ -2615,7 +2852,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	conn = GetConnection(server, mapping, false);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
-	if (PQserverVersion(conn) < 90100)
+	if (PFCserverVersion(conn) < 90100)
 		import_collate = false;
 
 	/* Create workspace for strings */
@@ -2628,7 +2865,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -2723,7 +2960,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 950c6f7..b117a88 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -18,19 +18,22 @@
 #include "nodes/relation.h"
 #include "utils/relcache.h"
 
-#include "libpq-fe.h"
+#include "PgFdwConn.h"
+
+struct PgFdwScanState;
 
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
+extern void finish_async_query(PgFdwConn *fsstate);
 
 /* in connection.c */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
+extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
-extern unsigned int GetCursorNumber(PGconn *conn);
-extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+extern void ReleaseConnection(PgFdwConn *conn);
+extern unsigned int GetCursorNumber(PgFdwConn *conn);
+extern unsigned int GetPrepStmtNumber(PgFdwConn *conn);
+extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql);
 
 /* in option.c */
-- 
2.1.0.GIT

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to