Hello, 

I noticed that postgresql_fdw can run in parallel by very small
change. The attached patch let scans by postgres_fdws on
different foreign servers run sumiltaneously. This seems a
convenient entry point to parallel execution.

For the testing configuration which the attched sql script makes,
it almost halves the response time because the remote queries
take far longer startup time than running time. The two foreign
tables fvs1, fvs2 and fvs1_2 are defined on the same table but
fvs1 and fvs1_2 are on the same foreign server pgs1 and fvs2 is
on the another foreign server pgs2.

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 
b on (a.a = b.a);
                            QUERY PLAN
-----------------------------------------------------------------------
Hash Join (actual time=12083.640..12083.657 rows=16 loops=1)
  Hash Cond: (a.a = b.a)
 ->  Foreign Scan on fvs1 a (actual time=6091.405..6091.407 rows=10 loops=1)
 ->  Hash (actual time=5992.212..5992.212 rows=10 loops=1)
     Buckets: 1024  Batches: 1  Memory Usage: 7kB
  ->  Foreign Scan on fvs1_2 b (actual time=5992.191..5992.198 rows=10 loops=1)
 Execution time: 12085.330 ms
(7 rows)

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b 
on (a.a = b.a);
                            QUERY PLAN
-----------------------------------------------------------------------
Hash Join (actual time=6325.004..6325.019 rows=16 loops=1)
  Hash Cond: (a.a = b.a)
 ->  Foreign Scan on fvs1 a (actual time=6324.910..6324.913 rows=10 loops=1)
 ->  Hash (actual time=0.073..0.073 rows=10 loops=1)
      Buckets: 1024  Batches: 1  Memory Usage: 7kB
  ->  Foreign Scan on fvs2 b (actual time=0.048..0.052 rows=10 loops=1)
 Execution time: 6327.708 ms
(7 rows)

In turn, pure local query is executed as below..

=# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM v a join v b on 
(a.a = b.a);
                                  QUERY PLAN
------------------------------------------------------------------------------
 Hash Join (actual time=15757.915..15757.925 rows=16 loops=1)
   Hash Cond: (a.a = b.a)
   ->  Limit (actual time=7795.919..7795.922 rows=10 loops=1)
      ->  Sort (actual time=7795.915..7795.915 rows=10 loops=1)
         ->  Nested Loop (actual time=54.769..7795.618 rows=252 loops=1)
             ->  Seq Scan on t a (actual time=0.010..2.117 rows=5000 loops=1)
             ->  Materialize (actual time=0.000..0.358 rows=5000 loops=5000)
                ->  Seq Scan on t b_1 (actual time=0.004..2.829 rows=5000 ...
   ->  Hash (actual time=7961.969..7961.969 rows=10 loops=1)
      ->  Subquery Scan on b (actual time=7961.948..7961.952 rows=10 loops=1)
         ->  Limit (actual time=7961.946..7961.950 rows=10 loops=1)
             ->  Sort (actual time=7961.946..7961.948 rows=10 loops=1)
                ->  Nested Loop (actual time=53.518..7961.611 rows=252 loops=1)
                   ->  Seq Scan on t a_1 (actual time=0.004..2.247 rows=5000...
                   ->  Materialize (actual time=0.000..0.357 rows=5000...
                      ->  Seq Scan on t b_2 (actual time=0.001..1.565 rows=500..
 Execution time: 15758.629 ms
(26 rows)


I will try this way for the present.

Any opinions or suggestions?

- Is this a correct entry point?

- Parallel postgres_fdw is of course a intermediate shape. It
 should go toward more intrinsic form.

- Planner should be aware of parallelism. The first step seems to
 be doable since postgres_fdw can get correct startup and running
 costs. But they might should be calculated locally for loopback
 connections finally. Dedicated node would be needed.

- The far effective intercommunication means between backends
 including backend workers (which seems to be discussed in
 another thread) is needed and this could be the test bench for
 it.

- This patch is the minimal implement to get parallel scan
 available. A facility to exporting/importing execution trees may
 promise far flexible parallelism. Deparsing is usable to
 reconstruct partial query?

- The means for resource management, especially on number of
 backends is required. This could be done on foreign server in a
 simple form for the present. Finally this will be moved into
 intrinsic loopback connection manager?

- Any other points to consider?


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 116be7d..399ca31 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -411,11 +411,13 @@ begin_remote_xact(ConnCacheEntry *entry)
 void
 ReleaseConnection(PGconn *conn)
 {
-	/*
-	 * Currently, we don't actually track connection references because all
-	 * cleanup is managed on a transaction or subtransaction basis instead. So
-	 * there's nothing to do here.
-	 */
+	/* Clean up current asynchronous query if any */
+	while (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
+	{
+		PGresult *res = PQgetResult(conn);
+		if (res)
+			PQclear(res);
+	}		
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 4c49776..eec299e 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -306,7 +306,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void fetch_more_data(ForeignScanState *node, bool async_start);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
@@ -328,6 +328,9 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
 
+#define CONN_IS_IDLE(conn) \
+	(PQtransactionStatus(conn) == PQTRANS_IDLE || \
+	 PQtransactionStatus(conn) == PQTRANS_INTRANS)
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -981,6 +984,18 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
 	else
 		fsstate->param_values = NULL;
+
+	if (CONN_IS_IDLE(fsstate->conn))
+	{
+		/* 
+		 * This connection is allowed asynchronous query execution, so start
+		 * it now. This relies on the fact that ExecInitForeignScan's share
+		 * the same foreign server are executed in the same order with
+		 * ExecForeignScan's.
+		 */
+		create_cursor(node);
+		fetch_more_data(node, true);
+	}
 }
 
 /*
@@ -1008,7 +1023,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	{
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(node, false);
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
 			return ExecClearTuple(slot);
@@ -2001,10 +2016,11 @@ create_cursor(ForeignScanState *node)
 }
 
 /*
- * Fetch some more rows from the node's cursor.
+ * Fetch some more rows from the node's cursor. It starts asynchronous query
+ * execution then immediately returns if async_start is true.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(ForeignScanState *node, bool async_start)
 {
 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
@@ -2033,36 +2049,56 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
-		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
-			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
-
-		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
-
-		for (i = 0; i < numrows; i++)
+		if (async_start)
 		{
-			fsstate->tuples[i] =
-				make_tuple_from_result_row(res, i,
-										   fsstate->rel,
-										   fsstate->attinmeta,
-										   fsstate->retrieved_attrs,
-										   fsstate->temp_cxt);
+			Assert(CONN_IS_IDLE(conn));
+
+			if (!PQsendQuery(conn, sql))
+				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 		}
+		else
+		{
+			if (!CONN_IS_IDLE(conn))
+				res = PQgetResult(conn);
+			/*
+			 * Transaction status won't be INTRANS or IDLE before calling
+			 * PQgetResult() after all result is received. PQgetResult()
+			 * returns NULL for the case.
+			 */
 
-		/* Update fetch_ct_2 */
-		if (fsstate->fetch_ct_2 < 2)
-			fsstate->fetch_ct_2++;
+			if (!res)
+				res = PQexec(conn, sql);
 
-		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fetch_size);
+			/* On error, report the original query, not the FETCH. */
+			if (PQresultStatus(res) != PGRES_TUPLES_OK)
+				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
-		PQclear(res);
-		res = NULL;
+			/* Convert the data into HeapTuples */
+			numrows = PQntuples(res);
+			fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+			fsstate->num_tuples = numrows;
+			fsstate->next_tuple = 0;
+
+			for (i = 0; i < numrows; i++)
+			{
+				fsstate->tuples[i] =
+					make_tuple_from_result_row(res, i,
+											   fsstate->rel,
+											   fsstate->attinmeta,
+											   fsstate->retrieved_attrs,
+											   fsstate->temp_cxt);
+			}
+
+			/* Update fetch_ct_2 */
+			if (fsstate->fetch_ct_2 < 2)
+				fsstate->fetch_ct_2++;
+
+			/* Must be EOF if we didn't get as many tuples as we asked for. */
+			fsstate->eof_reached = (numrows < fetch_size);
+
+			PQclear(res);
+			res = NULL;
+		}
 	}
 	PG_CATCH();
 	{
DROP SERVER IF EXISTS pgs1 CASCADE;
DROP SERVER IF EXISTS pgs2 CASCADE;
DROP VIEW IF EXISTS v CASCADE;
DROP TABLE IF EXISTS t CASCADE;

CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', 
dbname 'postgres', use_remote_estimate 'true');
CREATE SERVER pgs2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp', 
dbname 'postgres', use_remote_estimate 'true');

CREATE USER MAPPING FOR CURRENT_USER SERVER pgs1;
CREATE USER MAPPING FOR CURRENT_USER SERVER pgs2;

CREATE TABLE t (a int, b int, c text);
ALTER TABLE t ALTER COLUMN c SET STORAGE PLAIN;
INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X', (random() 
* 1000)::int) FROM generate_series(0, 4999));
-- EXPLAIN ANALYZE SELECT * FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY a.b 
LIMIT 10;
CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2 FROM t 
a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10;

CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text) 
SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs1_2 (a int, b int, c text, a2 int, b2 int, c2 text) 
SERVER pgs1 OPTIONS (table_name 'v');
CREATE FOREIGN TABLE fvs2 (a int, b int, c text, a2 int, b2 int, c2 text) 
SERVER pgs2 OPTIONS (table_name 'v');


EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = b.a);
EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = b.a);

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to