Oh Wow, i guess you are right. I just ran example where local runs make use of parallel setup, but not FDW.
i have three servers 2 x pg10 1 x pg11 i run queries on coordinator node ( pg11 ) which makes calls to foreign server to do a simple count. the individual nodes run the query in parallel, the setup is repeatable. but via FDW it runs a simple seq scan. i guess this is for the same reason as you mentioned wrt declared cursors. on pg11 create schema pg10; create schema pg10_qa; import foreign schema pg10 from server pg10 into pg10; import foreign schema pg10_qa from server pg10_qa into pg10_qa; explain (analyze,verbose) SELECT COUNT(1) FROM pg10.tbl_ItemTransactions; ----this query is via FDW QUERY PLAN ---------------------------------------------------------------------------------------------------- Foreign Scan (cost=108.53..152.69 rows=1 width=8) (actual time=6584.498..6584.500 rows=1 loops=1) Output: (count(1)) Relations: Aggregate on (pg10.tbl_itemtransactions) Remote SQL: SELECT count(1) FROM pg10.tbl_itemtransactions Planning Time: 0.112 ms Execution Time: 6585.435 ms (6 rows) 2019-02-18 09:56:48 UTC LOG: duration: 6593.046 ms plan: Query Text: DECLARE c1 CURSOR FOR SELECT count(1) FROM pg10.tbl_itemtransactions Aggregate (cost=768694.80..768694.81 rows=1 width=8) (actual time=6593.039..6593.039 rows=1 loops=1) Output: count(1) Buffers: shared hit=259476 -> Seq Scan on pg10.tbl_itemtransactions (cost=0.00..666851.04 rows=40737504 width=0) (actual time=0.024..3389.245 rows=40737601 loops=1) Output: tranid, transactiondate, transactionname Buffers: shared hit=259476 -------- on pg10 (1) -- foreign server pg10 create schema pg10; CREATE TABLE pg10.tbl_ItemTransactions ( TranID SERIAL ,TransactionDate TIMESTAMPTZ ,TransactionName TEXT ); INSERT INTO pg10.tbl_ItemTransactions (TransactionDate, TransactionName) SELECT x, 'dbrnd' FROM generate_series('2014-01-01 00:00:00'::timestamptz, '2016-08-01 00:00:00'::timestamptz,'2 seconds'::interval) a(x); explain analyze SELECT count(1) FROM pg10.tbl_itemtransactions; --this query is local QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=472650.72..472650.73 rows=1 width=8) (actual time=2576.053..2576.054 rows=1 loops=1) -> Gather (cost=472650.50..472650.71 rows=2 width=8) (actual time=2575.721..2626.980 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=471650.50..471650.51 rows=1 width=8) (actual time=2569.302..2569.302 rows=1 loops=3) -> Parallel Seq Scan on tbl_itemtransactions (cost=0.00..429215.60 rows=16973960 width=0) (actual time=0.048..1492.144 rows=13579200 loops=3) Planning time: 0.405 ms Execution time: 2627.455 ms (8 rows) -------- on pg10 (2) -- foreign server pg10_qa create schema pg10_qa; CREATE TABLE pg10_qa.tbl_ItemTransactions ( TranID SERIAL ,TransactionDate TIMESTAMPTZ ,TransactionName TEXT ); INSERT INTO pg10_qa.tbl_ItemTransactions (TransactionDate, TransactionName) SELECT x, 'dbrnd' FROM generate_series('2014-01-01 00:00:00'::timestamptz, '2016-08-01 00:00:00'::timestamptz,'2 seconds'::interval) a(x); explain analyze SELECT count(1) FROM pg10_qa.tbl_itemtransactions; -- this query is local QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=472650.72..472650.73 rows=1 width=8) (actual time=2568.469..2568.469 rows=1 loops=1) -> Gather (cost=472650.50..472650.71 rows=2 width=8) (actual time=2568.067..2613.006 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=471650.50..471650.51 rows=1 width=8) (actual time=2563.893..2563.893 rows=1 loops=3) -> Parallel Seq Scan on tbl_itemtransactions (cost=0.00..429215.60 rows=16973960 width=0) (actual time=0.017..1388.417 rows=13579200 loops=3) Planning time: 0.048 ms Execution time: 2613.246 ms (8 rows) but i guess partition elimination still works across the shards (see attached). atleast, we'll benefit from here :) in pg11. Regards, Vijay On Mon, Feb 18, 2019 at 3:07 AM Jeff Janes <jeff.ja...@gmail.com> wrote: > On Sun, Feb 17, 2019 at 6:32 AM Vijaykumar Jain <vj...@opentable.com> > wrote: > >> I am yet to figure out the reason, what we have done is implement fake >> columns to represent samples and giving them random numbers and keeping >> other bulls to fake limit. >> >> Most of the queries that were impacted were the ones that did not push >> order by and limit to foreign servers. >> I am also trying to upgrade pg11 to make use of parallelisation. >> > > postgres_fdw operates through declared cursors, and declared cursors > inhibit parallel query. This doesn't change in v11, see > https://www.postgresql.org/docs/11/when-can-parallel-query-be-used.html > > I'm not aware of any other changes in v11 that are likely to help you out. > > Cheers, > > Jeff > >>
on mgmt drop table if exists "flight_bookings"; drop table if exists "hotel_bookings"; drop table if exists "users"; drop table if exists "cities"; # partitions would be on shards create table "hotel_bookings" (id serial, user_id int, booked_at timestamp, city_name text, continent text, flight_id int) partition by list (continent); create table "flight_bookings" (id serial, user_id int, booked_at timestamp, from_city text, from_continent text, to_city text, to_continent text) partition by list (to_continent); # non partitioned table, but hotel_bookings and flight_bookings refer it create table "users" (id serial, name text, age int); # create foreign tables which are on shards create foreign table if not exists "flight_bookings1" partition of flight_bookings for values in ('Asia', 'Oceania', 'North America') server gc_bi_shard_1_primary; create foreign table if not exists "hotel_bookings1" partition of hotel_bookings for values in ('Asia', 'Oceania', 'North America') server gc_bi_shard_1_primary; create foreign table if not exists "flight_bookings2" partition of flight_bookings for values in ('Europe', 'Africa', 'South and Central America') server gc_bi_shard_2_primary; create foreign table if not exists "hotel_bookings2" partition of hotel_bookings for values in ('Europe', 'Africa', 'South and Central America') server gc_bi_shard_2_primary; # for demo purpose, create an unpartitioned table create table if not exists hotel_bookings_u (like hotel_bookings); create table if not exists flight_bookings_u (like flight_bookings); #load data in unpartitioned table first copy flight_bookings_u from '/var/tmp/pgconf-asia-demo/testdata/flight_bookings.csv' (format csv); copy hotel_bookings_u from '/var/tmp/pgconf-asia-demo/testdata/hotel_bookings.csv' (format csv); copy users from '/var/tmp/pgconf-asia-demo/testdata/users.csv' (format csv); #then load the foreign partitoned tables insert into hotel_bookings select * from hotel_bookings_u; insert into flight_bookings select * from flight_bookings_u; # on shard1 create partitioned tables hotel_bookings1 and flight_bookings1 and further subpartition by booked_at create table "hotel_bookings1" (id int, user_id int, booked_at timestamp, city_name text, continent text, flight_id int) partition by range (booked_at); create table "flight_bookings1" (id int, user_id int, booked_at timestamp, from_city text, from_continent text, to_city text, to_continent text) partition by range (booked_at); # subpartition hotel_bookings1 by booked_at create table hotel_bookings1_q1 partition of hotel_bookings1 for values from ('2017-01-01') to ('2017-04-01'); create table hotel_bookings1_q2 partition of hotel_bookings1 for values from ('2017-04-01') to ('2017-07-01'); create table hotel_bookings1_q3 partition of hotel_bookings1 for values from ('2017-07-01') to ('2017-10-01'); create table hotel_bookings1_q4 partition of hotel_bookings1 for values from ('2017-10-01') to ('2018-01-01'); create table hotel_bookings1_default partition of hotel_bookings1 DEFAULT; # subpartition flight_bookings1 by booked_at create table flight_bookings1_q1 partition of flight_bookings1 for values from ('2017-01-01') to ('2017-04-01'); create table flight_bookings1_q2 partition of flight_bookings1 for values from ('2017-04-01') to ('2017-07-01'); create table flight_bookings1_q3 partition of flight_bookings1 for values from ('2017-07-01') to ('2017-10-01'); create table flight_bookings1_q4 partition of flight_bookings1 for values from ('2017-10-01') to ('2018-01-01'); create table flight_bookings1_default partition of flight_bookings1 DEFAULT; # on shard2 create partitioned tables hotel_bookings2 and flight_bookings2 and further subpartition by booked_at drop table flight_bookings2; drop table hotel_bookings2; create table "hotel_bookings2" (id int, user_id int, booked_at timestamp, city_name text, continent text, flight_id int) partition by range (booked_at); create table "flight_bookings2" (id int, user_id int, booked_at timestamp, from_city text, from_continent text, to_city text, to_continent text) partition by range (booked_at); # subpartition hotel_bookings2 by booked_at create table hotel_bookings2_q1 partition of hotel_bookings2 for values from ('2017-01-01') to ('2017-04-01'); create table hotel_bookings2_q2 partition of hotel_bookings2 for values from ('2017-04-01') to ('2017-07-01'); create table hotel_bookings2_q3 partition of hotel_bookings2 for values from ('2017-07-01') to ('2017-10-01'); create table hotel_bookings2_q4 partition of hotel_bookings2 for values from ('2017-10-01') to ('2018-01-01'); create table hotel_bookings2_default partition of hotel_bookings2 DEFAULT; # subpartition flight_bookings2 by booked_at create table flight_bookings2_q1 partition of flight_bookings2 for values from ('2017-01-01') to ('2017-04-01'); create table flight_bookings2_q2 partition of flight_bookings2 for values from ('2017-04-01') to ('2017-07-01'); create table flight_bookings2_q3 partition of flight_bookings2 for values from ('2017-07-01') to ('2017-10-01'); create table flight_bookings2_q4 partition of flight_bookings2 for values from ('2017-10-01') to ('2018-01-01'); create table flight_bookings2_default partition of flight_bookings2 DEFAULT; # no filter conditions, scans all foreign tables explain select * from flight_bookings join hotel_bookings on flight_bookings.id = hotel_bookings.flight_id join users on flight_bookings.user_id = users.id; QUERY PLAN ----------------------------------------------------------------------------------------------------------- Merge Join (cost=6029.95..17762.67 rows=761410 width=269) Merge Cond: (hotel_bookings2.flight_id = flight_bookings2.id) -> Merge Append (cost=830.18..1071.98 rows=7440 width=47) Sort Key: hotel_bookings2.flight_id -> Foreign Scan on hotel_bookings2 (cost=339.72..404.81 rows=2893 width=49) -> Foreign Scan on hotel_bookings1 (cost=490.45..592.76 rows=4547 width=46) -> Materialize (cost=5199.77..5302.11 rows=20468 width=185) -> Sort (cost=5199.77..5250.94 rows=20468 width=185) Sort Key: flight_bookings2.id -> Hash Join (cost=419.00..1844.16 rows=20468 width=185) Hash Cond: (flight_bookings2.user_id = users.id) -> Append (cost=100.00..1243.72 rows=20468 width=54) -> Foreign Scan on flight_bookings2 (cost=100.00..453.89 rows=7654 width=56) -> Foreign Scan on flight_bookings1 (cost=100.00..687.49 rows=12814 width=53) -> Hash (cost=194.00..194.00 rows=10000 width=41) -> Seq Scan on users (cost=0.00..194.00 rows=10000 width=41) (16 rows) # continent filter restricts the scan to shard1 (eliminated shard2) and then filter on booked_at further restricts scan to q1 partition on tables on shard1 explain select * from flight_bookings join hotel_bookings on flight_bookings.id = hotel_bookings.flight_id join users on flight_bookings.user_id = users.id where flight_bookings.to_continent = 'Asia' and hotel_bookings.continent = 'Asia' and hotel_bookings.booked_at <= '2017-01-02'::timestamp and flight_bookings.booked_at <= '2017-01-02'::timestamp; QUERY PLAN -------------------------------------------------------------------------------------------------- Nested Loop (cost=299.43..575.88 rows=4 width=269) Join Filter: (flight_bookings1.id = hotel_bookings1.flight_id) -> Hash Join (cost=199.43..431.08 rows=15 width=185) Hash Cond: (users.id = flight_bookings1.user_id) -> Seq Scan on users (cost=0.00..194.00 rows=10000 width=41) -> Hash (cost=199.25..199.25 rows=15 width=59) -> Append (cost=100.00..199.25 rows=15 width=59) -> Foreign Scan on flight_bookings1 (cost=100.00..199.17 rows=15 width=59) -> Materialize (cost=100.00..143.91 rows=4 width=50) -> Append (cost=100.00..143.89 rows=4 width=50) -> Foreign Scan on hotel_bookings1 (cost=100.00..143.87 rows=4 width=50) (11 rows) # continent filter restricts the scan to shard1 (eliminated shard2) explain select * from flight_bookings join hotel_bookings on flight_bookings.id = hotel_bookings.flight_id join users on flight_bookings.user_id = users.id where flight_bookings.to_continent = 'Asia' and hotel_bookings.continent = 'Asia'; QUERY PLAN ---------------------------------------------------------------------------------------------------- Merge Join (cost=1582.79..2244.21 rows=40517 width=269) Merge Cond: (hotel_bookings1.flight_id = flight_bookings1.id) -> Merge Append (cost=291.62..341.45 rows=1533 width=39) Sort Key: hotel_bookings1.flight_id -> Foreign Scan on hotel_bookings1 (cost=291.61..326.11 rows=1533 width=39) -> Sort (cost=1291.17..1304.38 rows=5286 width=185) Sort Key: flight_bookings1.id -> Hash Join (cost=419.00..964.28 rows=5286 width=185) Hash Cond: (flight_bookings1.user_id = users.id) -> Append (cost=100.00..572.60 rows=5286 width=53) -> Foreign Scan on flight_bookings1 (cost=100.00..546.17 rows=5286 width=53) -> Hash (cost=194.00..194.00 rows=10000 width=41) -> Seq Scan on users (cost=0.00..194.00 rows=10000 width=41) (13 rows) # on shard1, no predicates but on join results in all partition scans explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id; QUERY PLAN --------------------------------------------------------------------------------------------- Hash Join (cost=171.04..659.85 rows=4547 width=99) Hash Cond: (flight_bookings1_q1.id = hotel_bookings1_q1.flight_id) -> Append (cost=0.00..331.21 rows=12814 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..65.83 rows=3183 width=53) -> Seq Scan on flight_bookings1_q2 (cost=0.00..65.73 rows=3173 width=53) -> Seq Scan on flight_bookings1_q3 (cost=0.00..67.21 rows=3221 width=53) -> Seq Scan on flight_bookings1_q4 (cost=0.00..67.35 rows=3235 width=53) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=2 width=66) -> Hash (cost=114.20..114.20 rows=4547 width=46) -> Append (cost=0.00..114.20 rows=4547 width=46) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..18.31 rows=931 width=39) -> Seq Scan on hotel_bookings1_q2 (cost=0.00..18.54 rows=954 width=39) -> Seq Scan on hotel_bookings1_q3 (cost=0.00..19.12 rows=1012 width=39) -> Seq Scan on hotel_bookings1_q4 (cost=0.00..18.30 rows=930 width=39) -> Seq Scan on hotel_bookings1_default (cost=0.00..17.20 rows=720 width=84) (15 rows) # on shard1, with predicates using booked_at on table flight_bookings1, scan restricted to sub partition (q1 and default) but all partitions for hotel_bookings1 gc_bi_shard_1=> explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id and flight_bookings1.booked_at <= '2017-01-02'::timestamp; QUERY PLAN ----------------------------------------------------------------------------------------------- Hash Join (cost=75.42..207.09 rows=41 width=99) Hash Cond: (hotel_bookings1_q1.flight_id = flight_bookings1_q1.id) -> Append (cost=0.00..114.20 rows=4547 width=46) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..18.31 rows=931 width=39) -> Seq Scan on hotel_bookings1_q2 (cost=0.00..18.54 rows=954 width=39) -> Seq Scan on hotel_bookings1_q3 (cost=0.00..19.12 rows=1012 width=39) -> Seq Scan on hotel_bookings1_q4 (cost=0.00..18.30 rows=930 width=39) -> Seq Scan on hotel_bookings1_default (cost=0.00..17.20 rows=720 width=84) -> Hash (cost=74.99..74.99 rows=35 width=53) -> Append (cost=0.00..74.99 rows=35 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..73.79 rows=34 width=53) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=1 width=66) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) # on shard1, with predicates using booked_at, scan restricted to sub partition (q1 and default) of both tables gc_bi_shard_1=> explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id and flight_bookings1.booked_at <= '2017-01-02'::timestamp and hotel_bookings1.booked_at <= '2017-01-02'::times tamp; QUERY PLAN ----------------------------------------------------------------------------------------------- Hash Join (cost=75.42..117.63 rows=41 width=135) Hash Cond: (hotel_bookings1_q1.flight_id = flight_bookings1_q1.id) -> Append (cost=0.00..40.87 rows=247 width=83) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..20.64 rows=7 width=39) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on hotel_bookings1_default (cost=0.00..19.00 rows=240 width=84) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Hash (cost=74.99..74.99 rows=35 width=53) -> Append (cost=0.00..74.99 rows=35 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..73.79 rows=34 width=53) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=1 width=66) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) (13 rows) # on shard1 when using predicates using the shard key, both default partition and matching partition scanned explain select * from hotel_bookings1 where booked_at <= '2017-01-02'::timestamp; QUERY PLAN ----------------------------------------------------------------------------------- Append (cost=0.00..40.87 rows=247 width=83) -> Append (cost=0.00..331.21 rows=12814 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..65.83 rows=3183 width=53) -> Seq Scan on flight_bookings1_q2 (cost=0.00..65.73 rows=3173 width=53) -> Seq Scan on flight_bookings1_q3 (cost=0.00..67.21 rows=3221 width=53) -> Seq Scan on flight_bookings1_q4 (cost=0.00..67.35 rows=3235 width=53) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=2 width=66) -> Hash (cost=114.20..114.20 rows=4547 width=46) -> Append (cost=0.00..114.20 rows=4547 width=46) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..18.31 rows=931 width=39) -> Seq Scan on hotel_bookings1_q2 (cost=0.00..18.54 rows=954 width=39) -> Seq Scan on hotel_bookings1_q3 (cost=0.00..19.12 rows=1012 width=39) -> Seq Scan on hotel_bookings1_q4 (cost=0.00..18.30 rows=930 width=39) -> Seq Scan on hotel_bookings1_default (cost=0.00..17.20 rows=720 width=84) (15 rows) # on shard1, with predicates using booked_at on table flight_bookings1, scan restricted to sub partition (q1 and default) but all partitions for hotel_bookings1 gc_bi_shard_1=> explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id and flight_bookings1.booked_at <= '2017-01-02'::timestamp; QUERY PLAN ----------------------------------------------------------------------------------------------- Hash Join (cost=75.42..207.09 rows=41 width=99) Hash Cond: (hotel_bookings1_q1.flight_id = flight_bookings1_q1.id) -> Append (cost=0.00..114.20 rows=4547 width=46) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..18.31 rows=931 width=39) -> Seq Scan on hotel_bookings1_q2 (cost=0.00..18.54 rows=954 width=39) -> Seq Scan on hotel_bookings1_q3 (cost=0.00..19.12 rows=1012 width=39) -> Seq Scan on hotel_bookings1_q4 (cost=0.00..18.30 rows=930 width=39) -> Seq Scan on hotel_bookings1_default (cost=0.00..17.20 rows=720 width=84) -> Hash (cost=74.99..74.99 rows=35 width=53) -> Append (cost=0.00..74.99 rows=35 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..73.79 rows=34 width=53) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=1 width=66) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) # on shard1, with predicates using booked_at, scan restricted to sub partition (q1 and default) of both tables gc_bi_shard_1=> explain select * from flight_bookings1 join hotel_bookings1 on flight_bookings1.id = hotel_bookings1.flight_id and flight_bookings1.booked_at <= '2017-01-02'::timestamp and hotel_bookings1.booked_at <= '2017-01-02'::times tamp; QUERY PLAN ----------------------------------------------------------------------------------------------- Hash Join (cost=75.42..117.63 rows=41 width=135) Hash Cond: (hotel_bookings1_q1.flight_id = flight_bookings1_q1.id) -> Append (cost=0.00..40.87 rows=247 width=83) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..20.64 rows=7 width=39) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on hotel_bookings1_default (cost=0.00..19.00 rows=240 width=84) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Hash (cost=74.99..74.99 rows=35 width=53) -> Append (cost=0.00..74.99 rows=35 width=53) -> Seq Scan on flight_bookings1_q1 (cost=0.00..73.79 rows=34 width=53) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on flight_bookings1_default (cost=0.00..1.02 rows=1 width=66) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) (13 rows) # on shard1 when using predicates using the shard key, both default partition and matching partition scanned explain select * from hotel_bookings1 where booked_at <= '2017-01-02'::timestamp; QUERY PLAN ----------------------------------------------------------------------------------- Append (cost=0.00..40.87 rows=247 width=83) -> Seq Scan on hotel_bookings1_q1 (cost=0.00..20.64 rows=7 width=39) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) -> Seq Scan on hotel_bookings1_default (cost=0.00..19.00 rows=240 width=84) Filter: (booked_at <= '2017-01-02 00:00:00'::timestamp without time zone) (5 rows)