On 10/5/20 11:35 AM, Etsuro Fujita wrote: Hi,I found a small problem. If we have a mix of async and sync subplans when we catch an assertion on a busy connection. Just for example:
PLAN ====Nested Loop (cost=100.00..174316.95 rows=975 width=8) (actual time=5.191..9.262 rows=9 loops=1)
Join Filter: (frgn.a = l.a) Rows Removed by Join Filter: 8991-> Append (cost=0.00..257.20 rows=11890 width=4) (actual time=0.419..2.773 rows=1000 loops=1)
Async subplans: 4
-> Async Foreign Scan on f_1 l_2 (cost=100.00..197.75
rows=2925 width=4) (actual time=0.381..0.585 rows=211 loops=1)
-> Async Foreign Scan on f_2 l_3 (cost=100.00..197.75
rows=2925 width=4) (actual time=0.005..0.206 rows=195 loops=1)
-> Async Foreign Scan on f_3 l_4 (cost=100.00..197.75
rows=2925 width=4) (actual time=0.003..0.282 rows=187 loops=1)
-> Async Foreign Scan on f_4 l_5 (cost=100.00..197.75
rows=2925 width=4) (actual time=0.003..0.316 rows=217 loops=1)
-> Seq Scan on l_0 l_1 (cost=0.00..2.90 rows=190 width=4)
(actual time=0.017..0.057 rows=190 loops=1)
-> Materialize (cost=100.00..170.94 rows=975 width=4) (actual
time=0.001..0.002 rows=9 loops=1000)
-> Foreign Scan on frgn (cost=100.00..166.06 rows=975
width=4) (actual time=0.766..0.768 rows=9 loops=1)
Reproduction script 'test1.sql' see in attachment. Here I force the problem reproduction with setting enable_hashjoin and enable_mergejoin to off.
'asyncmix.patch' contains my solution to this problem. -- regards, Andrey Lepikhov Postgres Professional
test1.sql
Description: application/sql
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 14824368cc..613d406982 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -455,7 +455,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
void *arg);
static void create_cursor(ForeignScanState *node);
static void request_more_data(ForeignScanState *node);
-static void fetch_received_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node, bool vacateconn);
static void vacate_connection(PgFdwState *fdwconn, bool clear_queue);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static PgFdwModifyState *create_foreign_modify(EState *estate,
@@ -1706,15 +1706,19 @@ postgresIterateForeignScan(ForeignScanState *node)
{
/*
* finish the running query before sending the next command for
- * this node
+ * this node.
+ * When the plan contains both asynchronous subplans and non-async
+ * subplans backend could request more data in async mode and want to
+ * get data in sync mode by the same connection. Here it must wait
+ * for async data before request another.
*/
- if (!fsstate->s.commonstate->busy)
- vacate_connection((PgFdwState *)fsstate, false);
+ if (fsstate->s.commonstate->busy)
+ vacate_connection(&fsstate->s, false);
request_more_data(node);
/* Fetch the result immediately. */
- fetch_received_data(node);
+ fetch_received_data(node, false);
}
else if (!fsstate->s.commonstate->busy)
{
@@ -1749,7 +1753,7 @@ postgresIterateForeignScan(ForeignScanState *node)
/* fetch the leader's data and enqueue it for the next request */
if (available)
{
- fetch_received_data(leader);
+ fetch_received_data(leader, false);
add_async_waiter(leader);
}
}
@@ -3729,7 +3733,7 @@ request_more_data(ForeignScanState *node)
* Fetches received data and automatically send requests of the next waiter.
*/
static void
-fetch_received_data(ForeignScanState *node)
+fetch_received_data(ForeignScanState *node, bool vacateconn)
{
PgFdwScanState *fsstate = GetPgFdwScanState(node);
PGresult *volatile res = NULL;
@@ -3817,7 +3821,8 @@ fetch_received_data(ForeignScanState *node)
waiter = move_to_next_waiter(node);
/* send the next request if any */
- if (waiter)
+ if (waiter && (!vacateconn ||
+ GetPgFdwScanState(node)->s.conn != GetPgFdwScanState(waiter)->s.conn))
request_more_data(waiter);
MemoryContextSwitchTo(oldcontext);
@@ -3843,7 +3848,7 @@ vacate_connection(PgFdwState *fdwstate, bool clear_queue)
* query
*/
leader = commonstate->leader;
- fetch_received_data(leader);
+ fetch_received_data(leader, true);
/* let the first waiter be the next leader of this connection */
move_to_next_waiter(leader);
