At Thu, 20 Jul 2017 18:15:42 -0400, Tom Lane <t...@sss.pgh.pa.us> wrote in <18927.1500588...@sss.pgh.pa.us> > This seems like overkill. We can test it reasonably easily within the > existing framework, as shown in the attached patch. I'm also fairly
It checks for a disconnection caused in a single session. I thought that its inter-process characteristics is important (since I had forgot that in the previous version), but it is reasonable enough if we can rely on the fact that it surely works through invalidation mechanism. In shoft, I agree to the test in your patch. > concerned that what you're showing here would be unstable in the buildfarm > as a result of race conditions between the multiple sessions. Sure. It is what I meant by 'fragile'. > I made some cosmetic updates to the code patch, as well. Thank you for leaving the hashvalue staff and revising the comment. By the way I mistakenly had left the following code in the previous patch. + /* hashvalue == 0 means a cache reset, must clear all state */ + if (hashvalue == 0) + entry->invalidated = true; + else if ((cacheid == FOREIGNSERVEROID && + entry->server_hashvalue == hashvalue) || + (cacheid == USERMAPPINGOID && + entry->mapping_hashvalue == hashvalue)) + entry->invalidated = true; The reason for the redundancy was that it had used switch-case in the else block just before. However, it is no longer reasonable. I'd like to change here as the follows. + /* hashvalue == 0 means a cache reset, must clear all state */ + if ((hashvalue == 0) || + ((cacheid == FOREIGNSERVEROID && + entry->server_hashvalue == hashvalue) || + (cacheid == USERMAPPINGOID && + entry->mapping_hashvalue == hashvalue))) + entry->invalidated = true; The attached patch differs only in this point. > I think this is actually a bug fix, and should not wait for the next > commitfest. Agreed. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c --- b/contrib/postgres_fdw/connection.c *************** *** 22,27 **** --- 22,28 ---- #include "pgstat.h" #include "storage/latch.h" #include "utils/hsearch.h" + #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" *************** *** 48,58 **** typedef struct ConnCacheEntry --- 49,63 ---- { ConnCacheKey key; /* hash key (must be first) */ PGconn *conn; /* connection to foreign server, or NULL */ + /* Remaining fields are invalid when conn is NULL: */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ + bool invalidated; /* true if reconnect is pending */ + uint32 server_hashvalue; /* hash value of foreign server OID */ + uint32 mapping_hashvalue; /* hash value of user mapping OID */ } ConnCacheEntry; /* *************** *** 69,74 **** static bool xact_got_connection = false; --- 74,80 ---- /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); + static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); *************** *** 78,83 **** static void pgfdw_subxact_callback(SubXactEvent event, --- 84,90 ---- SubTransactionId mySubid, SubTransactionId parentSubid, void *arg); + static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, *************** *** 130,135 **** GetConnection(UserMapping *user, bool will_prep_stmt) --- 137,146 ---- */ RegisterXactCallback(pgfdw_xact_callback, NULL); RegisterSubXactCallback(pgfdw_subxact_callback, NULL); + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, + pgfdw_inval_callback, (Datum) 0); + CacheRegisterSyscacheCallback(USERMAPPINGOID, + pgfdw_inval_callback, (Datum) 0); } /* Set flag that we did GetConnection during the current transaction */ *************** *** 144,161 **** GetConnection(UserMapping *user, bool will_prep_stmt) entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); if (!found) { ! /* initialize new hashtable entry (key is already filled in) */ entry->conn = NULL; - entry->xact_depth = 0; - entry->have_prep_stmt = false; - entry->have_error = false; - entry->changing_xact_state = false; } /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); /* * We don't check the health of cached connection here, because it would * require some overhead. Broken connection will be detected when the * connection is actually used. --- 155,182 ---- entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); if (!found) { ! /* ! * We need only clear "conn" here; remaining fields will be filled ! * later when "conn" is set. ! */ entry->conn = NULL; } /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); /* + * If the connection needs to be remade due to invalidation, disconnect as + * soon as we're out of all transactions. + */ + if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0) + { + elog(DEBUG3, "closing connection %p for option changes to take effect", + entry->conn); + disconnect_pg_server(entry); + } + + /* * We don't check the health of cached connection here, because it would * require some overhead. Broken connection will be detected when the * connection is actually used. *************** *** 164,178 **** GetConnection(UserMapping *user, bool will_prep_stmt) /* * If cache entry doesn't have a connection, we have to establish a new * connection. (If connect_pg_server throws an error, the cache entry ! * will be left in a valid empty state.) */ if (entry->conn == NULL) { ForeignServer *server = GetForeignServer(user->serverid); ! entry->xact_depth = 0; /* just to be sure */ entry->have_prep_stmt = false; entry->have_error = false; entry->conn = connect_pg_server(server, user); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", --- 185,208 ---- /* * If cache entry doesn't have a connection, we have to establish a new * connection. (If connect_pg_server throws an error, the cache entry ! * will remain in a valid empty state, ie conn == NULL.) */ if (entry->conn == NULL) { ForeignServer *server = GetForeignServer(user->serverid); ! /* Reset all transient state fields, to be sure all are clean */ ! entry->xact_depth = 0; entry->have_prep_stmt = false; entry->have_error = false; + entry->changing_xact_state = false; + entry->invalidated = false; + entry->server_hashvalue = + GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid); + entry->mapping_hashvalue = + GetSysCacheHashValue1(USERMAPPINGOID, user->umid); + + /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", *************** *** 277,282 **** connect_pg_server(ForeignServer *server, UserMapping *user) --- 307,325 ---- } /* + * Disconnect any open connection for a connection cache entry. + */ + static void + disconnect_pg_server(ConnCacheEntry *entry) + { + if (entry->conn != NULL) + { + PQfinish(entry->conn); + entry->conn = NULL; + } + } + + /* * For non-superusers, insist that the connstr specify a password. This * prevents a password from being picked up from .pgpass, a service file, * the environment, etc. We don't want the postgres user's passwords *************** *** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg) entry->changing_xact_state) { elog(DEBUG3, "discarding connection %p", entry->conn); ! PQfinish(entry->conn); ! entry->conn = NULL; ! entry->changing_xact_state = false; } } --- 820,826 ---- entry->changing_xact_state) { elog(DEBUG3, "discarding connection %p", entry->conn); ! disconnect_pg_server(entry); } } *************** *** 897,902 **** pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, --- 938,984 ---- } /* + * Connection invalidation callback function + * + * After a change to a pg_foreign_server or pg_user_mapping catalog entry, + * mark connections depending on that entry as needing to be remade. + * We can't immediately destroy them, since they might be in the midst of + * a transaction, but we'll remake them at the next opportunity. + * + * Although most cache invalidation callbacks blow away all the related stuff + * regardless of the given hashvalue, connections are expensive enough that + * it's worth trying to avoid that. + * + * NB: We could avoid unnecessary disconnection more strictly by examining + * individual option values, but it seems too much effort for the gain. + */ + static void + pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue) + { + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID); + + /* ConnectionHash must exist already, if we're registered */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + /* Ignore invalid entries */ + if (entry->conn == NULL) + continue; + + /* hashvalue == 0 means a cache reset, must clear all state */ + if (hashvalue == 0 || + ((cacheid == FOREIGNSERVEROID && + entry->server_hashvalue == hashvalue) || + (cacheid == USERMAPPINGOID && + entry->mapping_hashvalue == hashvalue))) + entry->invalidated = true; + } + } + + /* * Raise an error if the given connection cache entry is marked as being * in the middle of an xact state change. This should be called at which no * such change is expected to be in progress; if one is found to be in *************** *** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) Form_pg_user_mapping umform; ForeignServer *server; ! if (!entry->changing_xact_state) return; tup = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key)); if (!HeapTupleIsValid(tup)) --- 995,1008 ---- Form_pg_user_mapping umform; ForeignServer *server; ! /* nothing to do for inactive entries and entries of sane state */ ! if (entry->conn == NULL || !entry->changing_xact_state) return; + /* make sure this entry is inactive */ + disconnect_pg_server(entry); + + /* find server name to be shown in the message below */ tup = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key)); if (!HeapTupleIsValid(tup)) *** a/contrib/postgres_fdw/expected/postgres_fdw.out --- b/contrib/postgres_fdw/expected/postgres_fdw.out *************** *** 191,196 **** ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); --- 191,233 ---- public | ft_pg_type | loopback | (schema_name 'pg_catalog', table_name 'pg_type') | (6 rows) + -- Test that alteration of server options causes reconnection + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work + c3 | c4 + -------+------------------------------ + 00001 | Fri Jan 02 00:00:00 1970 PST + (1 row) + + ALTER SERVER loopback OPTIONS (SET dbname 'no such database'); + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail + ERROR: could not connect to server "loopback" + DETAIL: FATAL: database "no such database" does not exist + DO $d$ + BEGIN + EXECUTE $$ALTER SERVER loopback + OPTIONS (SET dbname '$$||current_database()||$$')$$; + END; + $d$; + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + c3 | c4 + -------+------------------------------ + 00001 | Fri Jan 02 00:00:00 1970 PST + (1 row) + + -- Test that alteration of user mapping options causes reconnection + ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (ADD user 'no such user'); + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail + ERROR: could not connect to server "loopback" + DETAIL: FATAL: role "no such user" does not exist + ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (DROP user); + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + c3 | c4 + -------+------------------------------ + 00001 | Fri Jan 02 00:00:00 1970 PST + (1 row) + -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and remote-estimate mode on ft2. *** a/contrib/postgres_fdw/sql/postgres_fdw.sql --- b/contrib/postgres_fdw/sql/postgres_fdw.sql *************** *** 195,200 **** ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); --- 195,220 ---- ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ + -- Test that alteration of server options causes reconnection + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work + ALTER SERVER loopback OPTIONS (SET dbname 'no such database'); + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail + DO $d$ + BEGIN + EXECUTE $$ALTER SERVER loopback + OPTIONS (SET dbname '$$||current_database()||$$')$$; + END; + $d$; + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + + -- Test that alteration of user mapping options causes reconnection + ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (ADD user 'no such user'); + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail + ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (DROP user); + SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and remote-estimate mode on ft2.
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers