In order to minimize the impact, what can be done is to execute fetch_more_data() in asynchronous mode every time, when there only few rows left to be consumed. So in current code below 1019 /* 1020 * Get some more tuples, if we've run out. 1021 */ 1022 if (fsstate->next_tuple >= fsstate->num_tuples) 1023 { 1024 /* No point in another fetch if we already detected EOF, though. */ 1025 if (!fsstate->eof_reached) 1026 fetch_more_data(node, false); 1027 /* If we didn't get any tuples, must be end of data. */ 1028 if (fsstate->next_tuple >= fsstate->num_tuples) 1029 return ExecClearTuple(slot); 1030 }
replace line 1022 with if (fsstate->next_tuple >= fsstate->num_tuples) with if (fsstate->next_tuple >= fsstate->num_tuples - SOME_BUFFER_NUMBER_ROWS) Other possibility is to call PQsendQuery(conn, sql), after line 2100 and if eof_reached is false. 2096 /* Must be EOF if we didn't get as many tuples as we asked for. */ 2097 fsstate->eof_reached = (numrows < fetch_size); 2098 2099 PQclear(res); 2100 res = NULL; On Fri, Jul 25, 2014 at 3:37 PM, Ashutosh Bapat < ashutosh.ba...@enterprisedb.com> wrote: > Hi Kyotaro, > fetch_more_rows() always runs "FETCH 100 FROM <cursor_name>" on the > foreign server to get the next set of rows. The changes you have made seem > to run only the first FETCHes from all the nodes but not the subsequent > ones. The optimization will be helpful only when there are less than 100 > rows per postgres connection in the query. If there are more than 100 rows > from a single foreign server, the second onwards FETCHes will be serialized. > > Is my understanding correct? > > > On Fri, Jul 25, 2014 at 2:05 PM, Kyotaro HORIGUCHI < > horiguchi.kyot...@lab.ntt.co.jp> wrote: > >> Hello, >> >> I noticed that postgresql_fdw can run in parallel by very small >> change. The attached patch let scans by postgres_fdws on >> different foreign servers run sumiltaneously. This seems a >> convenient entry point to parallel execution. >> >> For the testing configuration which the attched sql script makes, >> it almost halves the response time because the remote queries >> take far longer startup time than running time. The two foreign >> tables fvs1, fvs2 and fvs1_2 are defined on the same table but >> fvs1 and fvs1_2 are on the same foreign server pgs1 and fvs2 is >> on the another foreign server pgs2. >> >> =# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join >> fvs1_2 b on (a.a = b.a); >> QUERY PLAN >> ----------------------------------------------------------------------- >> Hash Join (actual time=12083.640..12083.657 rows=16 loops=1) >> Hash Cond: (a.a = b.a) >> -> Foreign Scan on fvs1 a (actual time=6091.405..6091.407 rows=10 >> loops=1) >> -> Hash (actual time=5992.212..5992.212 rows=10 loops=1) >> Buckets: 1024 Batches: 1 Memory Usage: 7kB >> -> Foreign Scan on fvs1_2 b (actual time=5992.191..5992.198 rows=10 >> loops=1) >> Execution time: 12085.330 ms >> (7 rows) >> >> =# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join >> fvs2 b on (a.a = b.a); >> QUERY PLAN >> ----------------------------------------------------------------------- >> Hash Join (actual time=6325.004..6325.019 rows=16 loops=1) >> Hash Cond: (a.a = b.a) >> -> Foreign Scan on fvs1 a (actual time=6324.910..6324.913 rows=10 >> loops=1) >> -> Hash (actual time=0.073..0.073 rows=10 loops=1) >> Buckets: 1024 Batches: 1 Memory Usage: 7kB >> -> Foreign Scan on fvs2 b (actual time=0.048..0.052 rows=10 loops=1) >> Execution time: 6327.708 ms >> (7 rows) >> >> In turn, pure local query is executed as below.. >> >> =# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM v a join v b >> on (a.a = b.a); >> QUERY PLAN >> >> ------------------------------------------------------------------------------ >> Hash Join (actual time=15757.915..15757.925 rows=16 loops=1) >> Hash Cond: (a.a = b.a) >> -> Limit (actual time=7795.919..7795.922 rows=10 loops=1) >> -> Sort (actual time=7795.915..7795.915 rows=10 loops=1) >> -> Nested Loop (actual time=54.769..7795.618 rows=252 loops=1) >> -> Seq Scan on t a (actual time=0.010..2.117 rows=5000 >> loops=1) >> -> Materialize (actual time=0.000..0.358 rows=5000 >> loops=5000) >> -> Seq Scan on t b_1 (actual time=0.004..2.829 rows=5000 >> ... >> -> Hash (actual time=7961.969..7961.969 rows=10 loops=1) >> -> Subquery Scan on b (actual time=7961.948..7961.952 rows=10 >> loops=1) >> -> Limit (actual time=7961.946..7961.950 rows=10 loops=1) >> -> Sort (actual time=7961.946..7961.948 rows=10 loops=1) >> -> Nested Loop (actual time=53.518..7961.611 rows=252 >> loops=1) >> -> Seq Scan on t a_1 (actual time=0.004..2.247 >> rows=5000... >> -> Materialize (actual time=0.000..0.357 rows=5000... >> -> Seq Scan on t b_2 (actual time=0.001..1.565 >> rows=500.. >> Execution time: 15758.629 ms >> (26 rows) >> >> >> I will try this way for the present. >> >> Any opinions or suggestions? >> >> - Is this a correct entry point? >> >> - Parallel postgres_fdw is of course a intermediate shape. It >> should go toward more intrinsic form. >> >> - Planner should be aware of parallelism. The first step seems to >> be doable since postgres_fdw can get correct startup and running >> costs. But they might should be calculated locally for loopback >> connections finally. Dedicated node would be needed. >> >> - The far effective intercommunication means between backends >> including backend workers (which seems to be discussed in >> another thread) is needed and this could be the test bench for >> it. >> >> - This patch is the minimal implement to get parallel scan >> available. A facility to exporting/importing execution trees may >> promise far flexible parallelism. Deparsing is usable to >> reconstruct partial query? >> >> - The means for resource management, especially on number of >> backends is required. This could be done on foreign server in a >> simple form for the present. Finally this will be moved into >> intrinsic loopback connection manager? >> >> - Any other points to consider? >> >> >> regards, >> >> -- >> Kyotaro Horiguchi >> NTT Open Source Software Center >> >> DROP SERVER IF EXISTS pgs1 CASCADE; >> DROP SERVER IF EXISTS pgs2 CASCADE; >> DROP VIEW IF EXISTS v CASCADE; >> DROP TABLE IF EXISTS t CASCADE; >> >> CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host >> '/tmp', dbname 'postgres', use_remote_estimate 'true'); >> CREATE SERVER pgs2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host >> '/tmp', dbname 'postgres', use_remote_estimate 'true'); >> >> CREATE USER MAPPING FOR CURRENT_USER SERVER pgs1; >> CREATE USER MAPPING FOR CURRENT_USER SERVER pgs2; >> >> CREATE TABLE t (a int, b int, c text); >> ALTER TABLE t ALTER COLUMN c SET STORAGE PLAIN; >> INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X', >> (random() * 1000)::int) FROM generate_series(0, 4999)); >> -- EXPLAIN ANALYZE SELECT * FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY >> a.b LIMIT 10; >> CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2 >> FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10; >> >> CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text) >> SERVER pgs1 OPTIONS (table_name 'v'); >> CREATE FOREIGN TABLE fvs1_2 (a int, b int, c text, a2 int, b2 int, c2 >> text) SERVER pgs1 OPTIONS (table_name 'v'); >> CREATE FOREIGN TABLE fvs2 (a int, b int, c text, a2 int, b2 int, c2 text) >> SERVER pgs2 OPTIONS (table_name 'v'); >> >> >> EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a = >> b.a); >> EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a = >> b.a); >> >> >> >> -- >> Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) >> To make changes to your subscription: >> http://www.postgresql.org/mailpref/pgsql-hackers >> >> > > > -- > Best Wishes, > Ashutosh Bapat > EnterpriseDB Corporation > The Postgres Database Company > -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company