I'm sorry to have coded a silly bug.
The previous patch has a bug in realloc size calculation.
And separation of the 'connname patch' was incomplete in regtest.
It is fixed in this patch.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..4de28ef 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn
bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
+typedef struct storeInfo
+{
+ Tuplestorestate *tuplestore;
+ int nattrs;
+ MemoryContext oldcontext;
+ AttInMetadata *attinmeta;
+ char* valbuf;
+ int valbuflen;
+ char **cstrs;
+ bool error_occurred;
+ bool nummismatch;
+} storeInfo;
+
/*
* Internal declarations
*/
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);
static void validate_pkattnums(Relation rel,
int2vector *pkattnums_arg, int32 pknumatts_arg,
int **pkattnums, int *pknumatts);
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, PGrowValue *columns, void *param);
+
/* Global */
static remoteConn *pconn = NULL;
@@ -111,6 +127,9 @@ typedef struct remoteConnHashEnt
/* initial number of connection hashes */
#define NUMCONN 16
+/* Initial block size for value buffer in storeHandler */
+#define INITBUFLEN 64
+
/* general utility */
#define xpfree(var_) \
do { \
@@ -503,6 +522,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
char *curname = NULL;
int howmany = 0;
bool fail = true; /* default to backward compatible */
+ storeInfo storeinfo;
DBLINK_INIT;
@@ -559,15 +579,51 @@ dblink_fetch(PG_FUNCTION_ARGS)
appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+ /*
* Try to execute the query. Note that since libpq uses malloc, the
* PGresult will be long-lived even though we are still in a short-lived
* memory context.
*/
- res = PQexec(conn, buf.data);
+ PG_TRY();
+ {
+ res = PQexec(conn, buf.data);
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+
+ finishStoreInfo(&storeinfo);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Skip remaining results when storeHandler raises exception. */
+ PQskipResult(conn, TRUE);
+ ReThrowError(edata);
+ }
+ PG_END_TRY();
+
+ finishStoreInfo(&storeinfo);
+
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
+ /* finishStoreInfo saves the fields referred to below. */
+ if (storeinfo.nummismatch)
+ {
+ /* This is only for backward compatibility */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
+
dblink_res_error(conname, res, "could not fetch from cursor", fail);
return (Datum) 0;
}
@@ -579,8 +635,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));
}
+ PQclear(res);
- materializeResult(fcinfo, res);
return (Datum) 0;
}
@@ -640,6 +696,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible */
bool freeconn = false;
+ storeInfo storeinfo;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -660,6 +717,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
{
/* text,text,bool */
DBLINK_GET_CONN;
+ conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
fail = PG_GETARG_BOOL(2);
}
@@ -715,164 +773,234 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
rsinfo->setResult = NULL;
rsinfo->setDesc = NULL;
- /* synchronous query, or async result retrieval */
- if (!is_async)
- res = PQexec(conn, sql);
- else
+
+ /*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec/PQgetResult below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+ PG_TRY();
{
- res = PQgetResult(conn);
- /* NULL means we're all done with the async results */
- if (!res)
- return (Datum) 0;
+ /* synchronous query, or async result retrieval */
+ if (!is_async)
+ res = PQexec(conn, sql);
+ else
+ res = PQgetResult(conn);
}
+ PG_CATCH();
+ {
+ ErrorData *edata;
- /* if needed, close the connection to the database and cleanup */
- if (freeconn)
- PQfinish(conn);
+ finishStoreInfo(&storeinfo);
+ edata = CopyErrorData();
+ FlushErrorState();
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
+ /* Skip remaining results when storeHandler raises exception. */
+ PQskipResult(conn, TRUE);
+ ReThrowError(edata);
+ }
+ PG_END_TRY();
+
+ finishStoreInfo(&storeinfo);
+
+ /* NULL res from async get means we're all done with the results */
+ if (res || !is_async)
{
- dblink_res_error(conname, res, "could not execute query", fail);
- return (Datum) 0;
+ if (freeconn)
+ PQfinish(conn);
+
+ /*
+ * exclude mismatch of the numbers of the colums here so as to
+ * behave as before.
+ */
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK &&
+ !storeinfo.nummismatch))
+ {
+ dblink_res_error(conname, res, "could not execute query", fail);
+ return (Datum) 0;
+ }
+
+ /* Set command return status when the query was a command. */
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ char *values[1];
+ HeapTuple tuple;
+ AttInMetadata *attinmeta;
+ ReturnSetInfo *rcinfo = (ReturnSetInfo*)fcinfo->resultinfo;
+
+ values[0] = PQcmdStatus(res);
+ attinmeta = TupleDescGetAttInMetadata(rcinfo->setDesc);
+ tuple = BuildTupleFromCStrings(attinmeta, values);
+ tuplestore_puttuple(rcinfo->setResult, tuple);
+ }
+ else if (get_call_result_type(fcinfo, NULL, NULL) == TYPEFUNC_RECORD)
+ {
+ /* failed to determine actual type of RECORD */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record")));
+ }
+
+ /* finishStoreInfo saves the fields referred to below. */
+ if (storeinfo.nummismatch)
+ {
+ /* This is only for backward compatibility */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
}
- materializeResult(fcinfo, res);
+ if (res)
+ PQclear(res);
+
return (Datum) 0;
}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */
static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
- Assert(rsinfo->returnMode == SFRM_Materialize);
-
- PG_TRY();
+ sinfo->oldcontext = MemoryContextSwitchTo(
+ rsinfo->econtext->ecxt_per_query_memory);
+
+ switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
- TupleDesc tupdesc;
- bool is_sql_cmd = false;
- int ntuples;
- int nfields;
-
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- is_sql_cmd = true;
-
- /*
- * need a tuple descriptor representing one TEXT column to return
- * the command status string as our result tuple
- */
+ case TYPEFUNC_COMPOSITE:
+ tupdesc = CreateTupleDescCopy(tupdesc);
+ sinfo->nattrs = tupdesc->natts;
+ break;
+ case TYPEFUNC_RECORD:
tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0);
- ntuples = 1;
- nfields = 1;
- }
- else
- {
- Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+ sinfo->nattrs = 1;
+ break;
+ default:
+ /* result type isn't composite */
+ elog(ERROR, "return type must be a row type");
+ break;
+ }
- is_sql_cmd = false;
+ /* make sure we have a persistent copy of the tupdesc */
- /* get a tuple descriptor for our result type */
- switch (get_call_result_type(fcinfo, NULL, &tupdesc))
- {
- case TYPEFUNC_COMPOSITE:
- /* success */
- break;
- case TYPEFUNC_RECORD:
- /* failed to determine actual type of RECORD */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("function returning record called in context "
- "that cannot accept type record")));
- break;
- default:
- /* result type isn't composite */
- elog(ERROR, "return type must be a row type");
- break;
- }
+ sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ sinfo->error_occurred = FALSE;
+ sinfo->nummismatch = FALSE;
+ sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+ sinfo->valbuflen = INITBUFLEN;
+ sinfo->valbuf = (char *)palloc(sinfo->valbuflen);
+ sinfo->cstrs = (char **)palloc(sinfo->nattrs * sizeof(char *));
- /* make sure we have a persistent copy of the tupdesc */
- tupdesc = CreateTupleDescCopy(tupdesc);
- ntuples = PQntuples(res);
- nfields = PQnfields(res);
- }
+ rsinfo->setResult = sinfo->tuplestore;
+ rsinfo->setDesc = tupdesc;
+}
- /*
- * check result and tuple descriptor have the same number of columns
- */
- if (nfields != tupdesc->natts)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("remote query result rowtype does not match "
- "the specified FROM clause rowtype")));
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+ if (sinfo->valbuf)
+ {
+ pfree(sinfo->valbuf);
+ sinfo->valbuf = NULL;
+ }
- if (ntuples > 0)
- {
- AttInMetadata *attinmeta;
- Tuplestorestate *tupstore;
- MemoryContext oldcontext;
- int row;
- char **values;
-
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
- oldcontext = MemoryContextSwitchTo(
- rsinfo->econtext->ecxt_per_query_memory);
- tupstore = tuplestore_begin_heap(true, false, work_mem);
- rsinfo->setResult = tupstore;
- rsinfo->setDesc = tupdesc;
- MemoryContextSwitchTo(oldcontext);
+ if (sinfo->cstrs)
+ {
+ pfree(sinfo->cstrs);
+ sinfo->cstrs = NULL;
+ }
- values = (char **) palloc(nfields * sizeof(char *));
+ MemoryContextSwitchTo(sinfo->oldcontext);
+}
- /* put all tuples into the tuplestore */
- for (row = 0; row < ntuples; row++)
- {
- HeapTuple tuple;
+/* Prototype of this function is PQrowProcessor */
+static int
+storeHandler(PGresult *res, PGrowValue *columns, void *param)
+{
+ storeInfo *sinfo = (storeInfo *)param;
+ HeapTuple tuple;
+ int newbuflen;
+ int fields = PQnfields(res);
+ int i;
+ char **cstrs = sinfo->cstrs;
+ char *pbuf;
+
+ if (sinfo->error_occurred)
+ return -1;
+
+ if (sinfo->nattrs != fields)
+ {
+ sinfo->error_occurred = TRUE;
+ sinfo->nummismatch = TRUE;
+ finishStoreInfo(sinfo);
- if (!is_sql_cmd)
- {
- int i;
+ /* This error will be processed in dblink_record_internal() */
+ return -1;
+ }
- for (i = 0; i < nfields; i++)
- {
- if (PQgetisnull(res, row, i))
- values[i] = NULL;
- else
- values[i] = PQgetvalue(res, row, i);
- }
- }
- else
- {
- values[0] = PQcmdStatus(res);
- }
+ /*
+ * value input functions assumes that the input string is
+ * terminated by zero. We should make the values to be so.
+ */
- /* build the tuple and put it into the tuplestore. */
- tuple = BuildTupleFromCStrings(attinmeta, values);
- tuplestore_puttuple(tupstore, tuple);
- }
+ /*
+ * The length of the buffer for each field is value length + 1 for
+ * zero-termination
+ */
+ newbuflen = fields;
+ for(i = 0 ; i < fields ; i++)
+ newbuflen += columns[i].len;
+
+ if (newbuflen > sinfo->valbuflen)
+ {
+ int tmplen = sinfo->valbuflen * 2;
+ /*
+ * Try to (re)allocate in bigger steps to avoid flood of allocations
+ * on weird data.
+ */
+ while (newbuflen > tmplen && tmplen >= 0)
+ tmplen *= 2;
- /* clean up and return the tuplestore */
- tuplestore_donestoring(tupstore);
- }
+ /* Check if the integer was wrap-rounded. */
+ if (tmplen < 0)
+ elog(ERROR, "Buffer size for one row exceeds integer limit");
- PQclear(res);
+ sinfo->valbuf = (char *)repalloc(sinfo->valbuf, tmplen);
+ sinfo->valbuflen = tmplen;
}
- PG_CATCH();
+
+ pbuf = sinfo->valbuf;
+ for(i = 0 ; i < fields ; i++)
{
- /* be sure to release the libpq result */
- PQclear(res);
- PG_RE_THROW();
+ int len = columns[i].len;
+ if (len < 0)
+ cstrs[i] = NULL;
+ else
+ {
+ cstrs[i] = pbuf;
+ memcpy(pbuf, columns[i].value, len);
+ pbuf += len;
+ *pbuf++ = '\0';
+ }
}
- PG_END_TRY();
+
+ /*
+ * These functions may throw exception. It will be caught in
+ * dblink_record_internal()
+ */
+ tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+ tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+ return 1;
}
/*
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 4de28ef..05d7e98 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -733,6 +733,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
else
{
DBLINK_GET_CONN;
+ conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
}
}
@@ -763,6 +764,8 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
else
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
+
+ conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
}
if (!conn)
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index 511dd5e..2dcba15 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -371,7 +371,7 @@ SELECT *
FROM dblink('myconn','SELECT * FROM foobar',false) AS t(a int, b text, c text[])
WHERE t.a > 7;
NOTICE: relation "foobar" does not exist
-CONTEXT: Error occurred on dblink connection named "unnamed": could not execute query.
+CONTEXT: Error occurred on dblink connection named "myconn": could not execute query.
a | b | c
---+---+---
(0 rows)
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers