Hello. This is a version 3 patch.

- PgFdwConnState is removed

- PgFdwConn is isolated as a separate module.

- State transition was simplicated, I think.

- Comment about multiple scans on a connection is added.

- The issue of PREPARE is not addressed yet.

- It is to show how the new style looks, so it is lacking for
  comments for every PgFdwConn functions.

- Rebased to current master.

=======
> > I added a comment describing the and logic and meaning of the
> > statesjust above the enum declaration.
> >
> 
> This needs to be clarified further. But that can wait till we finalise the
> approach and the patch. Esp. comment below is confusing
> 1487 + * PGFDW_CONN_SYNC_RUNNING is rather an internal state in
> 1488 + * fetch_more_data(). It indicates that the function shouldn't send
> the next
> 1489 + * fetch requst after getting the result.
> 
> I couldn't get the meaning of the second sentence, esp. it's connection
> with synchronous-ness.

In this version, I removed PgFdwConnState. Now what indicates
that async fetch is running or not is the existence of
async_fetch. I think the complicated state transition is
dissapeard.

> > > The if condition if (entry->conn == NULL) in GetConnection(), used to
> > track
> > > whether there is a PGConn active for the given entry, now it tracks
> > whether
> > > it has PgFdwConn for the same.
> >
> > After some soncideration, I decided to make PgFdwConn to be
> > handled more similarly to PGconn. This patch has shrunk as a
> > result and bacame looks clear.
> >
> 
> I think it's still prone to segfaults considering two pointer indirections.

PGconn itself already makes two-level indirection, and PgFdwConn
has hidden its details mainly using macros. I may misunderstood
you, but if you're worried that PgFdwConn.pgconn can be set from
anywhere, we would should separate PgFdwConn into another
C-module and hide all the details as PGconn does. It is shown as
the separte patch. But I feel it a bit overdone because it is not
an end-user interface.

> > I wrote the outline of the logic in the comment for enum
> > PgFdwConnState in postgres_fdw.h. Is it make sense?
> >
> 
> The point about two different ForeignScan nodes using the same connection
> needs some clarification, I guess. It's not very clear, why would there be
> more queries run on the same connection. I know why this happens, but it's
> important to mention it somewhere. If it's already mentioned somewhere in
> the file, sorry for not paying attention to that.

Yeah. It is just what I stumbled on. I changed the comment in
fetch_more_data() like below. Does it make sense?

| /*
|  * On the current postgres_fdw implement, multiple PgFdwScanState
|  * on the same foreign server and mapped user share the same
|  * connection to the remote server (see GetConnection() in
|  * connection.c) and inidividual scans on it are separated using
|  * cursors. Since one connection cannot accept two or more
|  * asynchronous queries simultaneously, we should stop the async
|  * fetching if the another scan comes.
|  */
| 
| if (PFCgetNscans(conn) > 1)
|       PFCsetAsyncScan(conn, NULL);

> I think there is more chance that there will more than one ForeignScan
> nodes trying interact with a foreign server, even after the push-down work.
> The current solution doesn't address that. We actually need parallel
> querying in two cases
> 1. Querying multiple servers in parallel
> 2. Querying same server (by two querists) in parallel within the same query
> e.g. an un-pushable join.
> 
> We need a solution which is work in both the cases.

The first point is realized by this patch with some
limitations. The second point is that my old patch did, it made a
dedicated connection for individual scans up to some fixed number
aside the main connection, then the overflowed scans go to the
main connection and they are done in the manner the unpatched
postgres_fdw does.

I was thinking that the 'some fiexed number' could be set by a
parameter of a foreign server but I got a comment that it could
fill up the remote server unless reasonable load or/and bandwidth
managemant. So I abandoned the multiple-connection solution and
decided to do everything on the first connection. It's how the
current patch came.

> Is it possible to use the parallel query infrastructure being built by
> Robert or to do something like parallel seq scan? That will work, not just
> for Postgres FDW but all the FDWs.

I haven't seen closer to the patch but if my understanding by a
glance is correct, the parallel scan devides the target table
into multple parts then runs subscans on every part in parallel.

It might allocate dedicated processes for every child scan on a
partitioned table.

But, I think, from the performance view, every scan of multiple
foreign scans don't need correnponding local process. But if the
parallel scan infrastructure makes the mechanism simpler, using
it is a very promising choice.

> In case of Prepared statements, ExecInit is called at the end of planning,
> without subsequent execution like the case of EXPLAIN. I see that the patch
> handles EXPLAIN well, but I didn't see any specific code for PREPARE.

I'll look into the case after this, but I'd like to send a
revised patch at this point.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 58757978b5625aa3ae9a99fdcd9d6db393e62a5a Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Tue, 13 Jan 2015 19:20:35 +0900
Subject: [PATCH] Asynchronous execution of postgres_fdw v3

This is the modified version of Asynchronous execution of
postgres_fdw.

- Remove PgFdwAsyncState.
- Separate PgFdwConn into individual module
---
 contrib/postgres_fdw/Makefile       |   2 +-
 contrib/postgres_fdw/PgFdwConn.c    | 200 +++++++++++++++++++++++++++++++
 contrib/postgres_fdw/PgFdwConn.h    |  62 ++++++++++
 contrib/postgres_fdw/connection.c   |  86 +++++++------
 contrib/postgres_fdw/postgres_fdw.c | 232 ++++++++++++++++++++++++++----------
 contrib/postgres_fdw/postgres_fdw.h |  16 ++-
 6 files changed, 490 insertions(+), 108 deletions(-)
 create mode 100644 contrib/postgres_fdw/PgFdwConn.c
 create mode 100644 contrib/postgres_fdw/PgFdwConn.h

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index d2b98e1..d0913e2 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -1,7 +1,7 @@
 # contrib/postgres_fdw/Makefile
 
 MODULE_big = postgres_fdw
-OBJS = postgres_fdw.o option.o deparse.o connection.o $(WIN32RES)
+OBJS = postgres_fdw.o PgFdwConn.o option.o deparse.o connection.o $(WIN32RES)
 PGFILEDESC = "postgres_fdw - foreign data wrapper for PostgreSQL"
 
 PG_CPPFLAGS = -I$(libpq_srcdir)
diff --git a/contrib/postgres_fdw/PgFdwConn.c b/contrib/postgres_fdw/PgFdwConn.c
new file mode 100644
index 0000000..b13b597
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.c
@@ -0,0 +1,200 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.c
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "PgFdwConn.h"
+
+#define PFC_ALLOCATE()	((PgFdwConn *)malloc(sizeof(PgFdwConn)))
+#define PFC_FREE(c)		free(c)
+
+struct pgfdw_conn
+{
+	PGconn *pgconn;				/* libpq connection for this connection */
+	int		nscans;				/* number of scans using this connection */
+	struct PgFdwScanState *async_scan; /* the connection currently running
+										* async query on this connection  */
+};
+
+void
+PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan)
+{
+	conn->async_scan = scan;
+}
+
+struct PgFdwScanState *
+PFCgetAsyncScan(PgFdwConn *conn)
+{
+	return conn->async_scan;
+}
+
+int
+PFCisAsyncRunning(PgFdwConn *conn)
+{
+	return conn->async_scan != NULL;
+}
+
+PGconn *
+PFCgetPGconn(PgFdwConn *conn)
+{
+	return conn->pgconn;
+}
+
+int
+PFCgetNscans(PgFdwConn *conn)
+{
+	return conn->nscans;
+}
+
+int
+PFCincrementNscans(PgFdwConn *conn)
+{
+	return ++conn->nscans;
+}
+
+int
+PFCdecrementNscans(PgFdwConn *conn)
+{
+	Assert(conn->nscans > 0);
+	return --conn->nscans;
+}
+
+void
+PFCcancelAsync(PgFdwConn *conn)
+{
+	if (PFCisAsyncRunning(conn))
+		PFCconsumeInput(conn);
+}
+
+void
+PFCinit(PgFdwConn *conn)
+{
+	conn->async_scan = NULL;
+	conn->nscans = 0;
+}
+
+int
+PFCsendQuery(PgFdwConn *conn, const char *query)
+{
+	return PQsendQuery(conn->pgconn, query);
+}
+
+PGresult *
+PFCexec(PgFdwConn *conn, const char *query)
+{
+	return PQexec(conn->pgconn, query);
+}
+
+PGresult *
+PFCexecParams(PgFdwConn *conn,
+			  const char *command,
+			  int nParams,
+			  const Oid *paramTypes,
+			  const char *const * paramValues,
+			  const int *paramLengths,
+			  const int *paramFormats,
+			  int resultFormat)
+{
+	return PQexecParams(conn->pgconn,
+						command, nParams, paramTypes, paramValues,
+						paramLengths, paramFormats, resultFormat);
+}
+
+PGresult *
+PFCprepare(PgFdwConn *conn,
+		   const char *stmtName, const char *query,
+		   int nParams, const Oid *paramTypes)
+{
+	return PQprepare(conn->pgconn, stmtName, query, nParams, paramTypes);
+}
+
+PGresult *
+PFCexecPrepared(PgFdwConn *conn,
+				const char *stmtName,
+				int nParams,
+				const char *const * paramValues,
+				const int *paramLengths,
+				const int *paramFormats,
+				int resultFormat)
+{
+	return PQexecPrepared(conn->pgconn, 
+						  stmtName, nParams, paramValues, paramLengths,
+						  paramFormats, resultFormat);
+}
+
+PGresult *
+PFCgetResult(PgFdwConn *conn)
+{
+	return PQgetResult(conn->pgconn);
+}
+
+int
+PFCconsumeInput(PgFdwConn *conn)
+{
+	return PQconsumeInput(conn->pgconn);
+}
+
+int
+PFCisBusy(PgFdwConn *conn)
+{
+	return PQisBusy(conn->pgconn);
+}
+
+ConnStatusType
+PFCstatus(const PgFdwConn *conn)
+{
+	return PQstatus(conn->pgconn);
+}
+
+PGTransactionStatusType
+PFCtransactionStatus(const PgFdwConn *conn)
+{
+	return PQtransactionStatus(conn->pgconn);
+}
+
+int
+PFCserverVersion(const PgFdwConn *conn)
+{
+	return PQserverVersion(conn->pgconn);
+}
+
+char *
+PFCerrorMessage(const PgFdwConn *conn)
+{
+	return PQerrorMessage(conn->pgconn);
+}
+
+int
+PFCconnectionUsedPassword(const PgFdwConn *conn)
+{
+	return PQconnectionUsedPassword(conn->pgconn);
+}
+
+void
+PFCfinish(PgFdwConn *conn)
+{
+	return PQfinish(conn->pgconn);
+	PFC_FREE(conn);
+}
+
+PgFdwConn *
+PFCconnectdbParams(const char *const * keywords,
+				   const char *const * values, int expand_dbname)
+{
+	PgFdwConn *ret = PFC_ALLOCATE();
+
+	PFCinit(ret);
+	ret->pgconn = PQconnectdbParams(keywords, values, expand_dbname);
+
+	return ret;
+}
diff --git a/contrib/postgres_fdw/PgFdwConn.h b/contrib/postgres_fdw/PgFdwConn.h
new file mode 100644
index 0000000..2771de5
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.h
@@ -0,0 +1,62 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.h
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PGFDWCONN_H
+#define PGFDWCONN_H
+
+#include "libpq-fe.h"
+
+typedef struct pgfdw_conn PgFdwConn;
+struct PgFdwScanState;
+
+extern void PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan);
+extern struct PgFdwScanState *PFCgetAsyncScan(PgFdwConn *conn);
+extern int PFCisAsyncRunning(PgFdwConn *conn);
+extern PGconn *PFCgetPGconn(PgFdwConn *conn);
+extern int PFCgetNscans(PgFdwConn *conn);
+extern int PFCincrementNscans(PgFdwConn *conn);
+extern int PFCdecrementNscans(PgFdwConn *conn);
+extern void PFCcancelAsync(PgFdwConn *conn);
+extern void PFCinit(PgFdwConn *conn);
+extern int PFCsendQuery(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexec(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexecParams(PgFdwConn *conn,
+								const char *command,
+								int nParams,
+								const Oid *paramTypes,
+								const char *const * paramValues,
+								const int *paramLengths,
+								const int *paramFormats,
+								int resultFormat);
+extern PGresult *PFCprepare(PgFdwConn *conn,
+							const char *stmtName, const char *query,
+							int nParams, const Oid *paramTypes);
+extern PGresult *PFCexecPrepared(PgFdwConn *conn,
+								 const char *stmtName,
+								 int nParams,
+								 const char *const * paramValues,
+								 const int *paramLengths,
+								 const int *paramFormats,
+								 int resultFormat);
+extern PGresult *PFCgetResult(PgFdwConn *conn);
+extern int PFCconsumeInput(PgFdwConn *conn);
+extern int PFCisBusy(PgFdwConn *conn);
+extern ConnStatusType PFCstatus(const PgFdwConn *conn);
+extern PGTransactionStatusType PFCtransactionStatus(const PgFdwConn *conn);
+extern int PFCserverVersion(const PgFdwConn *conn);
+extern char *PFCerrorMessage(const PgFdwConn *conn);
+extern int PFCconnectionUsedPassword(const PgFdwConn *conn);
+extern void PFCfinish(PgFdwConn *conn);
+extern PgFdwConn *PFCconnectdbParams(const char *const * keywords,
+			 const char *const * values, int expand_dbname);
+
+#endif   /* PGFDWCONN_H */
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4e02cb2..5bf08ec 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,7 +44,7 @@ typedef struct ConnCacheKey
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
-	PGconn	   *conn;			/* connection to foreign server, or NULL */
+	PgFdwConn	*conn;			/* connection to foreign server, or NULL */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
@@ -64,10 +64,10 @@ static unsigned int prep_stmt_number = 0;
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static PgFdwConn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
-static void configure_remote_session(PGconn *conn);
-static void do_sql_command(PGconn *conn, const char *sql);
+static void configure_remote_session(PgFdwConn *conn);
+static void do_sql_command(PgFdwConn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
@@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  * be useful and not mere pedantry.  We could not flush any active connections
  * mid-transaction anyway.
  */
-PGconn *
+PgFdwConn *
 GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt)
 {
@@ -161,7 +161,7 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		entry->have_error = false;
 		entry->conn = connect_pg_server(server, user);
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
-			 entry->conn, server->servername);
+			 PFCgetPGconn(entry->conn), server->servername);
 	}
 
 	/*
@@ -169,6 +169,13 @@ GetConnection(ForeignServer *server, UserMapping *user,
 	 */
 	begin_remote_xact(entry);
 
+	/*
+	 * Finish async query immediately if another foreign scan node sharing
+	 * this connection comes.
+	 */
+	if (PFCincrementNscans(entry->conn) > 1 && PFCisAsyncRunning(entry->conn))
+		fetch_more_data(PFCgetAsyncScan(entry->conn));
+
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
 
@@ -178,10 +185,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
-static PGconn *
+static PgFdwConn *
 connect_pg_server(ForeignServer *server, UserMapping *user)
 {
-	PGconn	   *volatile conn = NULL;
+	PgFdwConn   *volatile conn = NULL;
 
 	/*
 	 * Use PG_TRY block to ensure closing connection on error.
@@ -223,14 +230,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		/* verify connection parameters and make connection */
 		check_conn_params(keywords, values);
 
-		conn = PQconnectdbParams(keywords, values, false);
-		if (!conn || PQstatus(conn) != CONNECTION_OK)
+		conn = PFCconnectdbParams(keywords, values, false);
+		if (!conn || PFCstatus(conn) != CONNECTION_OK)
 		{
 			char	   *connmessage;
 			int			msglen;
 
 			/* libpq typically appends a newline, strip that */
-			connmessage = pstrdup(PQerrorMessage(conn));
+			connmessage = pstrdup(PFCerrorMessage(conn));
 			msglen = strlen(connmessage);
 			if (msglen > 0 && connmessage[msglen - 1] == '\n')
 				connmessage[msglen - 1] = '\0';
@@ -246,7 +253,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		 * otherwise, he's piggybacking on the postgres server's user
 		 * identity. See also dblink_security_check() in contrib/dblink.
 		 */
-		if (!superuser() && !PQconnectionUsedPassword(conn))
+		if (!superuser() && !PFCconnectionUsedPassword(conn))
 			ereport(ERROR,
 				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
 				   errmsg("password is required"),
@@ -263,7 +270,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 	{
 		/* Release PGconn data structure if we managed to create one */
 		if (conn)
-			PQfinish(conn);
+			PFCfinish(conn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
@@ -312,9 +319,9 @@ check_conn_params(const char **keywords, const char **values)
  * there are any number of ways to break things.
  */
 static void
-configure_remote_session(PGconn *conn)
+configure_remote_session(PgFdwConn *conn)
 {
-	int			remoteversion = PQserverVersion(conn);
+	int			remoteversion = PFCserverVersion(conn);
 
 	/* Force the search path to contain only pg_catalog (see deparse.c) */
 	do_sql_command(conn, "SET search_path = pg_catalog");
@@ -348,11 +355,11 @@ configure_remote_session(PGconn *conn)
  * Convenience subroutine to issue a non-data-returning SQL command to remote
  */
 static void
-do_sql_command(PGconn *conn, const char *sql)
+do_sql_command(PgFdwConn *conn, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -379,7 +386,7 @@ begin_remote_xact(ConnCacheEntry *entry)
 		const char *sql;
 
 		elog(DEBUG3, "starting remote transaction on connection %p",
-			 entry->conn);
+			 PFCgetPGconn(entry->conn));
 
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
@@ -408,13 +415,11 @@ begin_remote_xact(ConnCacheEntry *entry)
  * Release connection reference count created by calling GetConnection.
  */
 void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(PgFdwConn *conn)
 {
-	/*
-	 * Currently, we don't actually track connection references because all
-	 * cleanup is managed on a transaction or subtransaction basis instead. So
-	 * there's nothing to do here.
-	 */
+	/* ongoing async query should be canceled if no scans left */
+	if (PFCdecrementNscans(conn) == 0)
+		finish_async_connection(conn);
 }
 
 /*
@@ -429,7 +434,7 @@ ReleaseConnection(PGconn *conn)
  * collisions are highly improbable; just be sure to use %u not %d to print.
  */
 unsigned int
-GetCursorNumber(PGconn *conn)
+GetCursorNumber(PgFdwConn *conn)
 {
 	return ++cursor_number;
 }
@@ -443,7 +448,7 @@ GetCursorNumber(PGconn *conn)
  * increasing the risk of prepared-statement name collisions by resetting.
  */
 unsigned int
-GetPrepStmtNumber(PGconn *conn)
+GetPrepStmtNumber(PgFdwConn *conn)
 {
 	return ++prep_stmt_number;
 }
@@ -462,7 +467,7 @@ GetPrepStmtNumber(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql)
 {
 	/* If requested, PGresult must be released before leaving this function. */
@@ -490,7 +495,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		 * return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
-			message_primary = PQerrorMessage(conn);
+			message_primary = PFCerrorMessage(conn);
 
 		ereport(elevel,
 				(errcode(sqlstate),
@@ -542,7 +547,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		if (entry->xact_depth > 0)
 		{
 			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+				 PFCgetPGconn(entry->conn));
 
 			switch (event)
 			{
@@ -567,7 +572,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = PFCexec(entry->conn, "DEALLOCATE ALL");
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
@@ -597,7 +602,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
+					res = PFCexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
 						pgfdw_report_error(WARNING, res, entry->conn, true,
@@ -608,7 +613,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						/* As above, make sure to clear any prepared stmts */
 						if (entry->have_prep_stmt && entry->have_error)
 						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
+							res = PFCexec(entry->conn, "DEALLOCATE ALL");
 							PQclear(res);
 						}
 						entry->have_prep_stmt = false;
@@ -620,17 +625,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 		/* Reset state to show we're out of a transaction */
 		entry->xact_depth = 0;
+		PFCcancelAsync(entry->conn);
+		PFCinit(entry->conn);
 
 		/*
 		 * If the connection isn't in a good idle state, discard it to
 		 * recover. Next GetConnection will open a new connection.
 		 */
-		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+		if (PFCstatus(entry->conn) != CONNECTION_OK ||
+			PFCtransactionStatus(entry->conn) != PQTRANS_IDLE)
 		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
-			PQfinish(entry->conn);
-			entry->conn = NULL;
+			elog(DEBUG3, "discarding connection %p",
+				 PFCgetPGconn(entry->conn));
+			PFCfinish(entry->conn);
 		}
 	}
 
@@ -676,6 +683,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		PGresult   *res;
 		char		sql[100];
 
+		/* Shut down asynchronous scan if running */
+		PFCcancelAsync(entry->conn);
+
 		/*
 		 * We only care about connections with open remote subtransactions of
 		 * the current level.
@@ -701,7 +711,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			snprintf(sql, sizeof(sql),
 					 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
 					 curlevel, curlevel);
-			res = PQexec(entry->conn, sql);
+			res = PFCexec(entry->conn, sql);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
 			else
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d76e739..08d9ca6 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -136,7 +136,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -156,6 +156,7 @@ typedef struct PgFdwScanState
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+	ExprContext	 *econtext;		/* copy of ps_ExprContext of ForeignScanState */
 } PgFdwScanState;
 
 /*
@@ -167,7 +168,7 @@ typedef struct PgFdwModifyState
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -298,7 +299,7 @@ static void estimate_path_cost_size(PlannerInfo *root,
 						double *p_rows, int *p_width,
 						Cost *p_startup_cost, Cost *p_total_cost);
 static void get_remote_estimate(const char *sql,
-					PGconn *conn,
+					PgFdwConn *conn,
 					double *rows,
 					int *width,
 					Cost *startup_cost,
@@ -306,9 +307,8 @@ static void get_remote_estimate(const char *sql,
 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
-static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void create_cursor(PgFdwScanState *node);
+static void close_cursor(PgFdwConn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
@@ -329,7 +329,6 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
 
-
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
  * to my callback routines.
@@ -982,6 +981,15 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
 	else
 		fsstate->param_values = NULL;
+
+	fsstate->econtext = node->ss.ps.ps_ExprContext;
+
+	/*
+	 * Start scanning asynchronously if it is the first scan on this
+	 * connection.
+	 */
+	if (PFCgetNscans(fsstate->conn) == 1)
+		create_cursor(fsstate);
 }
 
 /*
@@ -1000,7 +1008,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	 * cursor on the remote side.
 	 */
 	if (!fsstate->cursor_exists)
-		create_cursor(node);
+		create_cursor(fsstate);
 
 	/*
 	 * Get some more tuples, if we've run out.
@@ -1009,7 +1017,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	{
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(fsstate);
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
 			return ExecClearTuple(slot);
@@ -1069,7 +1077,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = PFCexec(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1398,13 +1406,13 @@ postgresExecForeignInsert(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1468,13 +1476,13 @@ postgresExecForeignUpdate(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1538,13 +1546,13 @@ postgresExecForeignDelete(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1594,7 +1602,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = PFCexec(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -1726,7 +1734,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_join_conds;
 		StringInfoData sql;
 		List	   *retrieved_attrs;
-		PGconn	   *conn;
+		PgFdwConn  *conn;
 		Selectivity local_sel;
 		QualCost	local_cost;
 
@@ -1836,7 +1844,7 @@ estimate_path_cost_size(PlannerInfo *root,
  * The given "sql" must be an EXPLAIN command.
  */
 static void
-get_remote_estimate(const char *sql, PGconn *conn,
+get_remote_estimate(const char *sql, PgFdwConn *conn,
 					double *rows, int *width,
 					Cost *startup_cost, Cost *total_cost)
 {
@@ -1852,7 +1860,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = PFCexec(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -1917,13 +1925,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  * Create cursor for node's query with current parameter values.
  */
 static void
-create_cursor(ForeignScanState *node)
+create_cursor(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
-	ExprContext *econtext = node->ss.ps.ps_ExprContext;
+	ExprContext *econtext = fsstate->econtext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PgFdwConn	*conn = fsstate->conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -1985,8 +1992,8 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	res = PFCexecParams(conn, buf.data, numParams, NULL, values,
+						 NULL, NULL, 0);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2001,15 +2008,21 @@ create_cursor(ForeignScanState *node)
 
 	/* Clean up */
 	pfree(buf.data);
+
+	/*
+	 * Start async scan if this is the first scan. See fetch_more_data() for
+	 * details
+	 */
+	if (PFCgetNscans(conn) == 1)
+		fetch_more_data(fsstate);
 }
 
 /*
  * Fetch some more rows from the node's cursor.
  */
-static void
-fetch_more_data(ForeignScanState *node)
+void
+fetch_more_data(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
@@ -2024,7 +2037,7 @@ fetch_more_data(ForeignScanState *node)
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
 		int			fetch_size;
 		int			numrows;
@@ -2036,9 +2049,57 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		if (PFCisAsyncRunning(conn))
+		{
+			/* Get result of running async fetch */
+			res = PFCgetResult(conn);
+			if (PQntuples(res) == fetch_size)
+			{
+				/*
+				 * Connection state doesn't go to IDLE even if all data
+				 * has been sent to client for asynchronous query. One
+				 * more PQgetResult() is needed to reset the state to
+				 * IDLE.  See PQexecFinish() for details.
+				 */
+				if (PFCgetResult(conn) != NULL)
+					elog(ERROR, "Connection status error.");
+			}
+
+			/*
+			 * On the current postgres_fdw implement, multiple PgFdwScanState
+			 * on the same foreign server and mapped user share the same
+			 * connection to the remote server (see GetConnection() in
+			 * connection.c) and inidividual scans on it are separated using
+			 * cursors. Since one connection cannot accept two or more
+			 * asynchronous queries simultaneously, we should stop the async
+			 * fetching if the another scan comes.
+			 */
+			if (PFCgetNscans(conn) > 1)
+				PFCsetAsyncScan(conn, NULL);
+		}
+		else
+		{
+			/*
+			 * If no async scan is running and the number of scans running on
+			 * this connection is 1, start async fetch.
+			 */
+			if (PFCgetNscans(conn) == 1)
+			{
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false,
+									   fsstate->query);
+
+				PFCsetAsyncScan(conn, fsstate);
+				goto end_of_fetch;
+			}
+
+			/* Elsewise do synchronous query execution */
+			PFCsetAsyncScan(conn, NULL);
+			res = PFCexec(conn, sql);
+		}
+
 		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		if (res &&  PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
 		/* Convert the data into HeapTuples */
@@ -2066,6 +2127,26 @@ fetch_more_data(ForeignScanState *node)
 
 		PQclear(res);
 		res = NULL;
+
+		if (PFCisAsyncRunning(conn))
+		{
+			if (!fsstate->eof_reached)
+			{
+				/*
+				 * We can immediately request the next bunch of tuples if
+				 * we're on asynchronous connection.
+				 */
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+			}
+			else
+			{
+				PFCsetAsyncScan(conn, NULL);
+			}
+		}
+
+end_of_fetch:
+		;	/* Nothing to do here but needed to make compiler quiet. */
 	}
 	PG_CATCH();
 	{
@@ -2079,6 +2160,31 @@ fetch_more_data(ForeignScanState *node)
 }
 
 /*
+ * Force cancelling async command state.
+ */
+void
+finish_async_connection(PgFdwConn *conn)
+{
+	PgFdwScanState *fsstate = PFCgetAsyncScan(conn);
+	PgFdwConn *async_conn;
+
+	/* Nothing to do if no async connection */
+	if (fsstate == NULL) return;
+	async_conn = fsstate->conn;
+	Assert(async_conn && PFCgetNscans(async_conn) != 1);
+
+	/* Finish async command if any */
+	if (PFCisAsyncRunning(async_conn))
+		fetch_more_data(PFCgetAsyncScan(async_conn));
+
+	Assert(!PFCisAsyncRunning(async_conn));
+
+	/* Immediately discard the result */
+	fsstate->next_tuple = 0;
+	fsstate->num_tuples = 0;
+}
+
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -2132,7 +2238,7 @@ reset_transmission_modes(int nestlevel)
  * Utility routine to close a cursor.
  */
 static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PgFdwConn *conn, unsigned int cursor_number)
 {
 	char		sql[64];
 	PGresult   *res;
@@ -2143,7 +2249,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -2175,11 +2281,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
-					p_name,
-					fmstate->query,
-					0,
-					NULL);
+	res = PFCprepare(fmstate->conn,
+					 p_name,
+					 fmstate->query,
+					 0,
+					 NULL);
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -2297,7 +2403,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
 
@@ -2329,7 +2435,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -2379,7 +2485,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	unsigned int cursor_number;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
@@ -2423,7 +2529,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -2453,7 +2559,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = PFCexec(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2582,7 +2688,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	bool		import_not_null = true;
 	ForeignServer *server;
 	UserMapping *mapping;
-	PGconn	   *conn;
+	PgFdwConn   *conn;
 	StringInfoData buf;
 	PGresult   *volatile res = NULL;
 	int			numrows,
@@ -2615,7 +2721,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	conn = GetConnection(server, mapping, false);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
-	if (PQserverVersion(conn) < 90100)
+	if (PFCserverVersion(conn) < 90100)
 		import_collate = false;
 
 	/* Create workspace for strings */
@@ -2628,7 +2734,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -2723,7 +2829,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 950c6f7..cac7dfc 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -18,19 +18,23 @@
 #include "nodes/relation.h"
 #include "utils/relcache.h"
 
-#include "libpq-fe.h"
+#include "PgFdwConn.h"
+
+struct PgFdwScanState;
 
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
+extern void fetch_more_data(struct PgFdwScanState *node);
+extern void finish_async_connection(PgFdwConn *fsstate);
 
 /* in connection.c */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
+extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
-extern unsigned int GetCursorNumber(PGconn *conn);
-extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+extern void ReleaseConnection(PgFdwConn *conn);
+extern unsigned int GetCursorNumber(PgFdwConn *conn);
+extern unsigned int GetPrepStmtNumber(PgFdwConn *conn);
+extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql);
 
 /* in option.c */
-- 
2.1.0.GIT

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to