The attached addresses items#2 and 3 as listed by Bruce here:
http://momjian.us/cgi-bin/pgsql/joe
I think it is consistent with the discussions we had a PGCon last week.
Any objections to me committing this for 8.4?
On a side note, should I try to address items #1 & #4 for 8.4 as well?
Perhaps #4 yes since it is arguably a bug fix, but no to #1?
Joe
Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.77
diff -c -r1.77 dblink.c
*** dblink.c 1 Jan 2009 17:23:31 -0000 1.77
--- dblink.c 25 May 2009 22:57:22 -0000
***************
*** 46,51 ****
--- 46,52 ----
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/spi.h"
+ #include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
***************
*** 77,83 ****
/*
* Internal declarations
*/
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn * rconn);
--- 78,84 ----
/*
* Internal declarations
*/
! static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn * rconn);
***************
*** 93,101 ****
static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid get_relid_from_relname(text *relname_text);
static char *generate_relation_name(Oid relid);
! static void dblink_connstr_check(const char *connstr);
! static void dblink_security_check(PGconn *conn, remoteConn *rconn);
static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
/* Global */
static remoteConn *pconn = NULL;
--- 94,103 ----
static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
static Oid get_relid_from_relname(text *relname_text);
static char *generate_relation_name(Oid relid);
! static void dblink_connstr_check(const char *connstr, bool is_fdw);
! static void dblink_security_check(PGconn *conn, remoteConn *rconn, bool is_fdw);
static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
+ static char *get_connect_string(const char *servername);
/* Global */
static remoteConn *pconn = NULL;
***************
*** 165,172 ****
} \
else \
{ \
! connstr = conname_or_str; \
! dblink_connstr_check(connstr); \
conn = PQconnectdb(connstr); \
if (PQstatus(conn) == CONNECTION_BAD) \
{ \
--- 167,180 ----
} \
else \
{ \
! bool is_fdw = true; \
! connstr = get_connect_string(conname_or_str); \
! if (connstr == NULL) \
! { \
! is_fdw = false; \
! connstr = conname_or_str; \
! } \
! dblink_connstr_check(connstr, is_fdw); \
conn = PQconnectdb(connstr); \
if (PQstatus(conn) == CONNECTION_BAD) \
{ \
***************
*** 177,183 ****
errmsg("could not establish connection"), \
errdetail("%s", msg))); \
} \
! dblink_security_check(conn, rconn); \
freeconn = true; \
} \
} while (0)
--- 185,191 ----
errmsg("could not establish connection"), \
errdetail("%s", msg))); \
} \
! dblink_security_check(conn, rconn, is_fdw); \
freeconn = true; \
} \
} while (0)
***************
*** 210,237 ****
Datum
dblink_connect(PG_FUNCTION_ARGS)
{
char *connstr = NULL;
char *connname = NULL;
char *msg;
PGconn *conn = NULL;
remoteConn *rconn = NULL;
DBLINK_INIT;
if (PG_NARGS() == 2)
{
! connstr = text_to_cstring(PG_GETARG_TEXT_PP(1));
connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
}
else if (PG_NARGS() == 1)
! connstr = text_to_cstring(PG_GETARG_TEXT_PP(0));
if (connname)
rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
sizeof(remoteConn));
/* check password in connection string if not superuser */
! dblink_connstr_check(connstr);
conn = PQconnectdb(connstr);
if (PQstatus(conn) == CONNECTION_BAD)
--- 218,255 ----
Datum
dblink_connect(PG_FUNCTION_ARGS)
{
+ char *conname_or_str = NULL;
char *connstr = NULL;
char *connname = NULL;
char *msg;
PGconn *conn = NULL;
remoteConn *rconn = NULL;
+ bool is_fdw = true;
DBLINK_INIT;
if (PG_NARGS() == 2)
{
! conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
}
else if (PG_NARGS() == 1)
! conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
if (connname)
rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
sizeof(remoteConn));
+ /* first check for valid foreign data server */
+ connstr = get_connect_string(conname_or_str);
+ if (connstr == NULL)
+ {
+ is_fdw = false;
+ connstr = conname_or_str;
+ }
+
/* check password in connection string if not superuser */
! dblink_connstr_check(connstr, is_fdw);
conn = PQconnectdb(connstr);
if (PQstatus(conn) == CONNECTION_BAD)
***************
*** 248,254 ****
}
/* check password actually used if not superuser */
! dblink_security_check(conn, rconn);
if (connname)
{
--- 266,272 ----
}
/* check password actually used if not superuser */
! dblink_security_check(conn, rconn, is_fdw);
if (connname)
{
***************
*** 689,713 ****
Datum
dblink_record(PG_FUNCTION_ARGS)
{
! return dblink_record_internal(fcinfo, false, false);
}
PG_FUNCTION_INFO_V1(dblink_send_query);
Datum
dblink_send_query(PG_FUNCTION_ARGS)
{
! return dblink_record_internal(fcinfo, true, false);
}
PG_FUNCTION_INFO_V1(dblink_get_result);
Datum
dblink_get_result(PG_FUNCTION_ARGS)
{
! return dblink_record_internal(fcinfo, true, true);
}
static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
{
FuncCallContext *funcctx;
TupleDesc tupdesc = NULL;
--- 707,753 ----
Datum
dblink_record(PG_FUNCTION_ARGS)
{
! return dblink_record_internal(fcinfo, false);
}
PG_FUNCTION_INFO_V1(dblink_send_query);
Datum
dblink_send_query(PG_FUNCTION_ARGS)
{
! PGconn *conn = NULL;
! char *connstr = NULL;
! char *sql = NULL;
! remoteConn *rconn = NULL;
! char *msg;
! bool freeconn = false;
! int retval;
!
! if (PG_NARGS() == 2)
! {
! DBLINK_GET_CONN;
! sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
! }
! else
! /* shouldn't happen */
! elog(ERROR, "wrong number of arguments");
!
! /* async query send */
! retval = PQsendQuery(conn, sql);
! if (retval != 1)
! elog(NOTICE, "%s", PQerrorMessage(conn));
!
! PG_RETURN_INT32(retval);
}
PG_FUNCTION_INFO_V1(dblink_get_result);
Datum
dblink_get_result(PG_FUNCTION_ARGS)
{
! return dblink_record_internal(fcinfo, true);
}
static Datum
! dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
{
FuncCallContext *funcctx;
TupleDesc tupdesc = NULL;
***************
*** 775,788 ****
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
}
! else if (is_async && do_get)
{
/* get async result */
if (PG_NARGS() == 2)
{
/* text,bool */
DBLINK_GET_CONN;
! fail = PG_GETARG_BOOL(2);
}
else if (PG_NARGS() == 1)
{
--- 815,828 ----
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
}
! else /* is_async */
{
/* get async result */
if (PG_NARGS() == 2)
{
/* text,bool */
DBLINK_GET_CONN;
! fail = PG_GETARG_BOOL(1);
}
else if (PG_NARGS() == 1)
{
***************
*** 793,929 ****
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
}
- else
- {
- /* send async query */
- if (PG_NARGS() == 2)
- {
- DBLINK_GET_CONN;
- sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
- }
- else
- /* shouldn't happen */
- elog(ERROR, "wrong number of arguments");
- }
if (!conn)
DBLINK_CONN_NOT_AVAIL;
! if (!is_async || (is_async && do_get))
{
! /* synchronous query, or async result retrieval */
! if (!is_async)
! res = PQexec(conn, sql);
! else
{
- res = PQgetResult(conn);
- /* NULL means we're all done with the async results */
- if (!res)
- {
- MemoryContextSwitchTo(oldcontext);
- SRF_RETURN_DONE(funcctx);
- }
- }
-
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
- {
- dblink_res_error(conname, res, "could not execute query", fail);
- if (freeconn)
- PQfinish(conn);
MemoryContextSwitchTo(oldcontext);
SRF_RETURN_DONE(funcctx);
}
! if (PQresultStatus(res) == PGRES_COMMAND_OK)
! {
! is_sql_cmd = true;
!
! /* need a tuple descriptor representing one TEXT column */
! tupdesc = CreateTemplateTupleDesc(1, false);
! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
! TEXTOID, -1, 0);
!
! /*
! * and save a copy of the command status string to return as
! * our result tuple
! */
! sql_cmd_status = PQcmdStatus(res);
! funcctx->max_calls = 1;
! }
! else
! funcctx->max_calls = PQntuples(res);
!
! /* got results, keep track of them */
! funcctx->user_fctx = res;
!
! /* if needed, close the connection to the database and cleanup */
if (freeconn)
PQfinish(conn);
! if (!is_sql_cmd)
! {
! /* get a tuple descriptor for our result type */
! switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! {
! case TYPEFUNC_COMPOSITE:
! /* success */
! break;
! case TYPEFUNC_RECORD:
! /* failed to determine actual type of RECORD */
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("function returning record called in context "
! "that cannot accept type record")));
! break;
! default:
! /* result type isn't composite */
! elog(ERROR, "return type must be a row type");
! break;
! }
! /* make sure we have a persistent copy of the tupdesc */
! tupdesc = CreateTupleDescCopy(tupdesc);
! }
/*
! * check result and tuple descriptor have the same number of
! * columns
*/
! if (PQnfields(res) != tupdesc->natts)
! ereport(ERROR,
! (errcode(ERRCODE_DATATYPE_MISMATCH),
! errmsg("remote query result rowtype does not match "
! "the specified FROM clause rowtype")));
! /* fast track when no results */
! if (funcctx->max_calls < 1)
{
! if (res)
! PQclear(res);
! MemoryContextSwitchTo(oldcontext);
! SRF_RETURN_DONE(funcctx);
}
! /* store needed metadata for subsequent calls */
! attinmeta = TupleDescGetAttInMetadata(tupdesc);
! funcctx->attinmeta = attinmeta;
!
! MemoryContextSwitchTo(oldcontext);
}
! else
{
! /* async query send */
MemoryContextSwitchTo(oldcontext);
! PG_RETURN_INT32(PQsendQuery(conn, sql));
}
- }
! if (is_async && !do_get)
! {
! /* async query send -- should not happen */
! elog(ERROR, "async query send called more than once");
}
--- 833,942 ----
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
}
if (!conn)
DBLINK_CONN_NOT_AVAIL;
! /* synchronous query, or async result retrieval */
! if (!is_async)
! res = PQexec(conn, sql);
! else
{
! res = PQgetResult(conn);
! /* NULL means we're all done with the async results */
! if (!res)
{
MemoryContextSwitchTo(oldcontext);
SRF_RETURN_DONE(funcctx);
}
+ }
! if (!res ||
! (PQresultStatus(res) != PGRES_COMMAND_OK &&
! PQresultStatus(res) != PGRES_TUPLES_OK))
! {
! dblink_res_error(conname, res, "could not execute query", fail);
if (freeconn)
PQfinish(conn);
+ MemoryContextSwitchTo(oldcontext);
+ SRF_RETURN_DONE(funcctx);
+ }
! if (PQresultStatus(res) == PGRES_COMMAND_OK)
! {
! is_sql_cmd = true;
! /* need a tuple descriptor representing one TEXT column */
! tupdesc = CreateTemplateTupleDesc(1, false);
! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
! TEXTOID, -1, 0);
/*
! * and save a copy of the command status string to return as
! * our result tuple
*/
! sql_cmd_status = PQcmdStatus(res);
! funcctx->max_calls = 1;
! }
! else
! funcctx->max_calls = PQntuples(res);
! /* got results, keep track of them */
! funcctx->user_fctx = res;
!
! /* if needed, close the connection to the database and cleanup */
! if (freeconn)
! PQfinish(conn);
!
! if (!is_sql_cmd)
! {
! /* get a tuple descriptor for our result type */
! switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
! case TYPEFUNC_COMPOSITE:
! /* success */
! break;
! case TYPEFUNC_RECORD:
! /* failed to determine actual type of RECORD */
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("function returning record called in context "
! "that cannot accept type record")));
! break;
! default:
! /* result type isn't composite */
! elog(ERROR, "return type must be a row type");
! break;
}
! /* make sure we have a persistent copy of the tupdesc */
! tupdesc = CreateTupleDescCopy(tupdesc);
}
!
! /*
! * check result and tuple descriptor have the same number of
! * columns
! */
! if (PQnfields(res) != tupdesc->natts)
! ereport(ERROR,
! (errcode(ERRCODE_DATATYPE_MISMATCH),
! errmsg("remote query result rowtype does not match "
! "the specified FROM clause rowtype")));
!
! /* fast track when no results */
! if (funcctx->max_calls < 1)
{
! if (res)
! PQclear(res);
MemoryContextSwitchTo(oldcontext);
! SRF_RETURN_DONE(funcctx);
}
! /* store needed metadata for subsequent calls */
! attinmeta = TupleDescGetAttInMetadata(tupdesc);
! funcctx->attinmeta = attinmeta;
!
! MemoryContextSwitchTo(oldcontext);
}
***************
*** 2249,2257 ****
}
static void
! dblink_security_check(PGconn *conn, remoteConn *rconn)
{
! if (!superuser())
{
if (!PQconnectionUsedPassword(conn))
{
--- 2262,2270 ----
}
static void
! dblink_security_check(PGconn *conn, remoteConn *rconn, bool is_fdw)
{
! if (!superuser() && !is_fdw)
{
if (!PQconnectionUsedPassword(conn))
{
***************
*** 2275,2283 ****
* to be accessible to non-superusers.
*/
static void
! dblink_connstr_check(const char *connstr)
{
! if (!superuser())
{
PQconninfoOption *options;
PQconninfoOption *option;
--- 2288,2296 ----
* to be accessible to non-superusers.
*/
static void
! dblink_connstr_check(const char *connstr, bool is_fdw)
{
! if (!superuser() && !is_fdw)
{
PQconninfoOption *options;
PQconninfoOption *option;
***************
*** 2358,2360 ****
--- 2371,2431 ----
errcontext("Error occurred on dblink connection named \"%s\": %s.",
dblink_context_conname, dblink_context_msg)));
}
+
+ /*
+ * Obtain connection string for a foreign server
+ */
+ static char *
+ get_connect_string(const char *servername)
+ {
+ ForeignServer *foreign_server;
+ UserMapping *user_mapping;
+ ListCell *cell;
+ StringInfo buf = makeStringInfo();
+ ForeignDataWrapper *fdw;
+ AclResult aclresult;
+
+ /* first gather the server connstr options */
+ foreign_server = GetForeignServerByName(servername, true);
+
+ if (foreign_server)
+ {
+ Oid serverid = foreign_server->serverid;
+ Oid fdwid = foreign_server->fdwid;
+ Oid userid = GetUserId();
+
+ user_mapping = GetUserMapping(userid, serverid);
+ fdw = GetForeignDataWrapper(fdwid);
+
+ /* Check permissions, user must have usage on the server. */
+ aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
+
+ foreach (cell, fdw->options)
+ {
+ DefElem *def = lfirst(cell);
+
+ appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+ }
+
+ foreach (cell, foreign_server->options)
+ {
+ DefElem *def = lfirst(cell);
+
+ appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+ }
+
+ foreach (cell, user_mapping->options)
+ {
+
+ DefElem *def = lfirst(cell);
+
+ appendStringInfo(buf, "%s='%s' ", def->defname, strVal(def->arg));
+ }
+
+ return pstrdup(buf->data);
+ }
+ else
+ return NULL;
+ }
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.24
diff -c -r1.24 dblink.out
*** expected/dblink.out 3 Jul 2008 03:56:57 -0000 1.24
--- expected/dblink.out 25 May 2009 23:14:03 -0000
***************
*** 784,786 ****
--- 784,819 ----
OK
(1 row)
+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ SELECT dblink_connect_u('myconn', 'fdtest');
+ dblink_connect_u
+ ------------------
+ OK
+ (1 row)
+
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+ a | b | c
+ ----+---+---------------
+ 0 | a | {a0,b0,c0}
+ 1 | b | {a1,b1,c1}
+ 2 | c | {a2,b2,c2}
+ 3 | d | {a3,b3,c3}
+ 4 | e | {a4,b4,c4}
+ 5 | f | {a5,b5,c5}
+ 6 | g | {a6,b6,c6}
+ 7 | h | {a7,b7,c7}
+ 8 | i | {a8,b8,c8}
+ 9 | j | {a9,b9,c9}
+ 10 | k | {a10,b10,c10}
+ (11 rows)
+
+ \c - :ORIGINAL_USER
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.20
diff -c -r1.20 dblink.sql
*** sql/dblink.sql 6 Apr 2008 16:54:48 -0000 1.20
--- sql/dblink.sql 25 May 2009 23:13:49 -0000
***************
*** 364,366 ****
--- 364,383 ----
SELECT dblink_cancel_query('dtest1');
SELECT dblink_error_message('dtest1');
SELECT dblink_disconnect('dtest1');
+
+ -- test foreign data wrapper functionality
+ CREATE USER dblink_regression_test;
+
+ CREATE FOREIGN DATA WRAPPER postgresql;
+ CREATE SERVER fdtest FOREIGN DATA WRAPPER postgresql OPTIONS (dbname 'contrib_regression');
+ CREATE USER MAPPING FOR public SERVER fdtest;
+ GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO dblink_regression_test;
+
+ \set ORIGINAL_USER :USER
+ \c - dblink_regression_test
+ SELECT dblink_connect_u('myconn', 'fdtest');
+ SELECT * FROM dblink('myconn','SELECT * FROM foo') AS t(a int, b text, c text[]);
+
+ \c - :ORIGINAL_USER
+ REVOKE EXECUTE ON FUNCTION dblink_connect_u(text, text) FROM dblink_regression_test;
+ DROP USER dblink_regression_test;
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers