Thank you for the comments.

At Mon, 17 Jul 2017 16:09:04 -0400, Tom Lane <t...@sss.pgh.pa.us> wrote in 
<9897.1500322...@sss.pgh.pa.us>
> Kyotaro HORIGUCHI <horiguchi.kyot...@lab.ntt.co.jp> writes:
> > This is the revased and revised version of the previous patch.
> 
> A few more comments:
> 
> * Per the spec for CacheRegisterSyscacheCallback, a zero hash value means
> to flush all associated state.  This isn't handling that case correctly.

Right, fixed.

> Even when you are given a specific hash value, I think exiting after
> finding one match is incorrect: there could be multiple connections
> to the same server with different user mappings, or vice versa.

Sure. I'm confused that key hash value nails an entry in "the
connection cache". Thank you for pointing out that.

> * I'm not really sure that it's worth the trouble to pay attention
> to the hash value at all.  Very few other syscache callbacks do,
> and the pg_server/pg_user_mapping catalogs don't seem likely to
> have higher than average traffic.

Agreed to the points. But there is another point that reconection
is far intensive than re-looking up of a system catalog or maybe
even than replanning. For now I choosed to avoid a possibility of
causing massive number of simultaneous reconnection.

> * Personally I'd be inclined to register the callbacks at the same
> time the hashtables are created, which is a pattern we use elsewhere
> ... in fact, postgres_fdw itself does it that way for the transaction
> related callbacks, so it seems a bit weird that you are going in a
> different direction for these callbacks.  That way avoids the need to
> depend on a _PG_init function and means that the callbacks don't have to
> worry about the hashtables not being valid.

Sure, seems more reasonable than it is now. Changed the way of
registring a callback in the attached.

>  Also, it seems a bit
> pointless to have separate layers of postgresMappingSysCallback and
> InvalidateConnectionForMapping functions.

It used to be one function but it seemed a bit wierd that the
function is called from two places (two catalogs) then branchs
according to the caller. I don't have a firm opinion on this so
changed.

As the result the changes in postgres_fdw.c has been disappeard.

> * How about some regression test cases?  You couldn't really exercise
> cross-session invalidation easily, but I don't think you need to.

Ha Ha. You got me. I will add some test cases for this in the
next version. Thanks.


Ashutosh, I'll register this to the next CF after providing a
regression, thanks.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "pgstat.h"
  #include "storage/latch.h"
  #include "utils/hsearch.h"
+ #include "utils/inval.h"
  #include "utils/memutils.h"
  #include "utils/syscache.h"
  
***************
*** 53,58 **** typedef struct ConnCacheEntry
--- 54,62 ----
  	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
  	bool		have_error;		/* have any subxacts aborted in this xact? */
  	bool		changing_xact_state;	/* xact state change in process */
+ 	bool		invalidated;	/* true if reconnect is requried */
+ 	uint32		server_hashvalue;	/* hash value of foreign server oid */
+ 	uint32		mapping_hashvalue;  /* hash value of user mapping oid */
  } ConnCacheEntry;
  
  /*
***************
*** 69,74 **** static bool xact_got_connection = false;
--- 73,79 ----
  
  /* prototypes of private functions */
  static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+ static void disconnect_pg_server(ConnCacheEntry *entry);
  static void check_conn_params(const char **keywords, const char **values);
  static void configure_remote_session(PGconn *conn);
  static void do_sql_command(PGconn *conn, const char *sql);
***************
*** 78,83 **** static void pgfdw_subxact_callback(SubXactEvent event,
--- 83,89 ----
  					   SubTransactionId mySubid,
  					   SubTransactionId parentSubid,
  					   void *arg);
+ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
  static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
  static bool pgfdw_cancel_query(PGconn *conn);
  static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
***************
*** 130,135 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 136,145 ----
  		 */
  		RegisterXactCallback(pgfdw_xact_callback, NULL);
  		RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
+ 		CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+ 									  pgfdw_inval_callback, (Datum)0);
+ 		CacheRegisterSyscacheCallback(USERMAPPINGOID,
+ 									  pgfdw_inval_callback, (Datum)0);
  	}
  
  	/* Set flag that we did GetConnection during the current transaction */
***************
*** 144,160 **** GetConnection(UserMapping *user, bool will_prep_stmt)
  	entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
  	if (!found)
  	{
! 		/* initialize new hashtable entry (key is already filled in) */
  		entry->conn = NULL;
- 		entry->xact_depth = 0;
- 		entry->have_prep_stmt = false;
- 		entry->have_error = false;
- 		entry->changing_xact_state = false;
  	}
  
  	/* Reject further use of connections which failed abort cleanup. */
  	pgfdw_reject_incomplete_xact_state_change(entry);
  
  	/*
  	 * We don't check the health of cached connection here, because it would
  	 * require some overhead.  Broken connection will be detected when the
--- 154,182 ----
  	entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
  	if (!found)
  	{
! 		/*
! 		 * key is already filled in, flags well be initialized at the time of
! 		 * making a new connection, so just clear conn here.
! 		 */
  		entry->conn = NULL;
  	}
  
  	/* Reject further use of connections which failed abort cleanup. */
  	pgfdw_reject_incomplete_xact_state_change(entry);
  
+ 
+ 	/*
+ 	 * This connection is no longer valid. Disconnect such connections if no
+ 	 * transaction is running.
+ 	 */
+ 	if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+ 	{
+ 		/* reconneced immediately, so the messages is "reconnecting"  */
+ 		elog(DEBUG3, "closing connection %p for option changes to take effect",
+ 			 entry->conn);
+ 		disconnect_pg_server(entry);
+ 	}
+ 
  	/*
  	 * We don't check the health of cached connection here, because it would
  	 * require some overhead.  Broken connection will be detected when the
***************
*** 173,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 195,206 ----
  		entry->xact_depth = 0;	/* just to be sure */
  		entry->have_prep_stmt = false;
  		entry->have_error = false;
+ 		entry->invalidated = false;
+ 		entry->changing_xact_state = false;
+ 		entry->server_hashvalue =
+ 			GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+ 		entry->mapping_hashvalue =
+ 			GetSysCacheHashValue1(USERMAPPINGOID, user->umid);
  		entry->conn = connect_pg_server(server, user);
  
  		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
***************
*** 276,281 **** connect_pg_server(ForeignServer *server, UserMapping *user)
--- 304,321 ----
  	return conn;
  }
  
+ /* disconnect the connection for a connection cache entry */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+ 	if (entry->conn != NULL)
+ 	{
+ 		PQfinish(entry->conn);
+ 		entry->conn = NULL;
+ 	}
+ }
+ 
+ 
  /*
   * For non-superusers, insist that the connstr specify a password.  This
   * prevents a password from being picked up from .pgpass, a service file,
***************
*** 429,434 **** ReleaseConnection(PGconn *conn)
--- 469,519 ----
  }
  
  /*
+  * Connection invalidation callback function
+  *
+  * Mark the connections to be disconnected if it depends on a foreign server
+  * or user mapping any options on which have been modified.
+  *
+  * Although most invalidate callbacks blow away all the related stuff
+  * regardless of the given hashvalue, if we blow away all existing connection
+  * from this server, it can cause a massive number of simultaneous
+  * reconnection to all the remotes. We restrict the connection to break to
+  * avoid such a mess.
+  *
+  * NB: We could avoid unnecessary disconnection more strictly by examining
+  * individual option values but it would be too-much for the gain.
+  */
+ static void
+ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+ 	HASH_SEQ_STATUS scan;
+ 	ConnCacheEntry *entry;
+ 
+ 	if (!ConnectionHash)
+ 		return;
+ 
+ 	hash_seq_init(&scan, ConnectionHash);
+ 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ 	{
+ 		if (entry->conn == NULL)
+ 			continue;
+ 
+ 		if (hashvalue == 0)
+ 			entry->invalidated = true;
+ 		else
+ 		{
+ 			Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+ 
+ 			if ((cacheid == FOREIGNSERVEROID &&
+ 				 entry->server_hashvalue == hashvalue) ||
+ 				(cacheid == USERMAPPINGOID &&
+ 				 entry->mapping_hashvalue == hashvalue))
+ 				entry->invalidated = true;
+ 		}
+ 	}
+ }
+ 
+ /*
   * Assign a "unique" number for a cursor.
   *
   * These really only need to be unique per connection within a transaction.
***************
*** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg)
  			entry->changing_xact_state)
  		{
  			elog(DEBUG3, "discarding connection %p", entry->conn);
! 			PQfinish(entry->conn);
! 			entry->conn = NULL;
! 			entry->changing_xact_state = false;
  		}
  	}
  
--- 862,868 ----
  			entry->changing_xact_state)
  		{
  			elog(DEBUG3, "discarding connection %p", entry->conn);
! 			disconnect_pg_server(entry);
  		}
  	}
  
***************
*** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
  	Form_pg_user_mapping umform;
  	ForeignServer *server;
  
! 	if (!entry->changing_xact_state)
  		return;
  
  	tup = SearchSysCache1(USERMAPPINGOID,
  						  ObjectIdGetDatum(entry->key));
  	if (!HeapTupleIsValid(tup))
--- 996,1009 ----
  	Form_pg_user_mapping umform;
  	ForeignServer *server;
  
! 	/* nothing to do for inactive entries and entries of sane state */
! 	if (entry->conn ==NULL || !entry->changing_xact_state)
  		return;
  
+ 	/* make sure this entry is inactive */
+ 	disconnect_pg_server(entry);
+ 
+ 	/* find server name to be shown in the message below */
  	tup = SearchSysCache1(USERMAPPINGOID,
  						  ObjectIdGetDatum(entry->key));
  	if (!HeapTupleIsValid(tup))
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to