*** dblink.c	Mon Jun 29 12:31:46 2009
--- dblink.fixed.c	Mon Jun 29 12:28:49 2009
***************
*** 40,45 ****
--- 40,46 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/tupdesc.h"
+ #include "access/xact.h"
  #include "catalog/indexing.h"
  #include "catalog/namespace.h"
  #include "catalog/pg_index.h"
***************
*** 54,59 ****
--- 55,61 ----
  #include "nodes/nodes.h"
  #include "nodes/pg_list.h"
  #include "parser/parse_type.h"
+ #include "storage/ipc.h"
  #include "utils/acl.h"
  #include "utils/array.h"
  #include "utils/builtins.h"
*************** typedef struct remoteConn
*** 76,81 ****
--- 78,86 ----
  	bool		newXactForCursor;		/* Opened a transaction for a cursor */
  } remoteConn;
  
+ extern void _PG_init(void);
+ extern void _PG_fini(void);
+ 
  /*
   * Internal declarations
   */
*************** static void dblink_security_check(PGconn
*** 100,109 ****
--- 105,132 ----
  static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
  static char *get_connect_string(const char *servername);
  static char *escape_param_str(const char *from);
+ static PGresult *execute_query(PGconn *conn, const char *sql);
+ static PGresult *wait_for_result(PGconn *conn);
+ static void register_result(PGresult *res);
+ static void unregister_result(PGresult *res);
+ static void AtEOXact_dblink(XactEvent event, void *arg);
  
  /* Global */
  static remoteConn *pconn = NULL;
  static HTAB *remoteConnHash = NULL;
+ static List *results = NIL;
+ 
+ void
+ _PG_init(void)
+ {
+ 	RegisterXactCallback(AtEOXact_dblink, 0);
+ }
+ 
+ void
+ _PG_fini(void)
+ {
+ 	UnregisterXactCallback(AtEOXact_dblink, 0);
+ }
  
  /*
   *	Following is list that holds multiple remote connections.
*************** dblink_fetch(PG_FUNCTION_ARGS)
*** 580,586 ****
  		 * PGresult will be long-lived even though we are still in a
  		 * short-lived memory context.
  		 */
! 		res = PQexec(conn, buf.data);
  		if (!res ||
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
--- 603,609 ----
  		 * PGresult will be long-lived even though we are still in a
  		 * short-lived memory context.
  		 */
! 		res = execute_query(conn, buf.data);
  		if (!res ||
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
*************** dblink_fetch(PG_FUNCTION_ARGS)
*** 637,642 ****
--- 660,666 ----
  			PQclear(res);
  			SRF_RETURN_DONE(funcctx);
  		}
+ 		register_result(res);
  
  		/*
  		 * switch to memory context appropriate for multiple function calls,
*************** dblink_fetch(PG_FUNCTION_ARGS)
*** 695,701 ****
  	else
  	{
  		/* do when there is no more left */
! 		PQclear(res);
  		SRF_RETURN_DONE(funcctx);
  	}
  }
--- 719,725 ----
  	else
  	{
  		/* do when there is no more left */
! 		unregister_result(res);
  		SRF_RETURN_DONE(funcctx);
  	}
  }
*************** dblink_record_internal(FunctionCallInfo 
*** 839,848 ****
  
  		/* synchronous query, or async result retrieval */
  		if (!is_async)
! 			res = PQexec(conn, sql);
  		else
  		{
! 			res = PQgetResult(conn);
  			/* NULL means we're all done with the async results */
  			if (!res)
  			{
--- 863,872 ----
  
  		/* synchronous query, or async result retrieval */
  		if (!is_async)
! 			res = execute_query(conn, sql);
  		else
  		{
! 			res = wait_for_result(conn);
  			/* NULL means we're all done with the async results */
  			if (!res)
  			{
*************** dblink_record_internal(FunctionCallInfo 
*** 930,935 ****
--- 954,960 ----
  			MemoryContextSwitchTo(oldcontext);
  			SRF_RETURN_DONE(funcctx);
  		}
+ 		register_result(res);
  
  		/* store needed metadata for subsequent calls */
  		attinmeta = TupleDescGetAttInMetadata(tupdesc);
*************** dblink_record_internal(FunctionCallInfo 
*** 989,995 ****
  	else
  	{
  		/* do when there is no more left */
! 		PQclear(res);
  		SRF_RETURN_DONE(funcctx);
  	}
  }
--- 1014,1020 ----
  	else
  	{
  		/* do when there is no more left */
! 		unregister_result(res);
  		SRF_RETURN_DONE(funcctx);
  	}
  }
*************** dblink_exec(PG_FUNCTION_ARGS)
*** 1167,1173 ****
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	res = PQexec(conn, sql);
  	if (!res ||
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
--- 1192,1198 ----
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	res = execute_query(conn, sql);
  	if (!res ||
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
*************** escape_param_str(const char *str)
*** 2466,2469 ****
--- 2491,2584 ----
  	}
  
  	return buf->data;
+ }
+ 
+ static PGresult *
+ execute_query(PGconn *conn, const char *sql)
+ {
+ 	/* async query send */
+ 	if (PQsendQuery(conn, sql) != 1)
+ 		elog(NOTICE, "%s", PQerrorMessage(conn));
+ 
+ 	return wait_for_result(conn);
+ }
+ 
+ static void
+ cancel_query(int code, Datum arg)
+ {
+ 	PGconn	   *conn = (PGconn *) DatumGetPointer(arg);
+ 	PGcancel   *cancel;
+ 	char		errbuf[256];
+ 
+ 	cancel = PQgetCancel(conn);
+ 	PQcancel(cancel, errbuf, 256);
+ 	PQfreeCancel(cancel);
+ }
+ 
+ static PGresult *
+ wait_for_result(PGconn *conn)
+ {
+ 	PGresult *res = NULL;
+ 
+ 	PG_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(conn));
+ 	for (;;)
+ 	{
+ 		fd_set			rset;
+ 		int				sock;
+ 		struct timeval	tv;
+ 
+ 		CHECK_FOR_INTERRUPTS();
+ 
+ 		PQconsumeInput(conn);
+ 		if (!PQisBusy(conn))
+ 		{
+ 			res = PQgetResult(conn);
+ 			break;
+ 		}
+ 
+ 		sock = PQsocket(conn);
+ 		FD_ZERO(&rset);
+ 		FD_SET(sock, &rset);
+ 		tv.tv_sec = 1;
+ 		tv.tv_usec = 0;
+ 		if (select(sock + 1, &rset, NULL, NULL, &tv) < 0 && errno != EINTR)
+ 			break;
+ 	}
+ 	PG_END_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(conn));
+ 
+ 	return res;
+ }
+ 
+ static void
+ register_result(PGresult *res)
+ {
+ 	MemoryContext oldcontext;
+ 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ 	results = lappend(results, res);
+ 	MemoryContextSwitchTo(oldcontext);
+ }
+ 
+ /* unregister and free result */
+ static void
+ unregister_result(PGresult *res)
+ {
+ 	results = list_delete_ptr(results, res);
+ 	PQclear(res);
+ }
+ 
+ static void
+ AtEOXact_dblink(XactEvent event, void *arg)
+ {
+ 	if (results != NIL)
+ 	{
+ 		ListCell *cell;
+ 
+ 		foreach(cell, results)
+ 		{
+ 			PQclear((PGresult *) lfirst(cell));
+ 		}
+ 
+ 		list_free(results);
+ 		results = NIL;
+ 	}
  }
