Hello, This is revised and rebased version of the patch.
a. Old term `Add Tuple Function' is changed to 'Store
Handler'. The reason why not `storage' is simply length of the
symbols.
b. I couldn't find the place to settle PGgetAsCString() in. It is
removed and storeHandler()@dblink.c touches PGresAttValue
directly in this new patch. Definition of PGresAttValue stays
in lipq-fe.h and provided with comment.
c. Refine error handling of dblink.c. I think it preserves the
previous behavior for column number mismatch and type
conversion exception.
d. Document is revised.
> It jumped from 332K tuples/sec to 450K, a 35% gain, and had a
> lower memory footprint too. Test methodology and those results
> are at
> http://archives.postgresql.org/pgsql-hackers/2011-12/msg00008.php
It is a disappointment that I found that the gain had become
lower than that according to the re-measuring.
For CentOS6.2 and other conditions are the same to the previous
testing, the overall performance became hihger and the loss of
libpq patch was 1.8% and the gain of full patch had been fallen
to 5.6%. But the reduction of the memory usage was not changed.
Original : 3.96s 100.0%
w/libpq patch : 4.03s 101.8%
w/libpq+dblink patch : 3.74s 94.4%
The attachments are listed below.
libpq_altstore_20120117.patch
- Allow alternative storage for libpql.
dblink_perf_20120117.patch
- Modify dblink to use alternative storage mechanism.
libpq_altstore_doc_20120117.patch
- Document for libpq_altstore. Shows in "31.19. Alternatie result storage"
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..83525e1 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,6 @@ PQconnectStartParams 157
PQping 158
PQpingParams 159
PQlibVersion 160
+PQregisterStoreHandler 161
+PQgetStoreHandlerParam 163
+PQsetStoreHandlerErrMes 164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index d454538..5559f0b 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,7 @@ makeEmptyPGconn(void)
conn->allow_ssl_try = true;
conn->wait_ssl_try = false;
#endif
+ conn->storeHandler = NULL;
/*
* We try to send at least 8K at a time, which is the usual size of pipe
@@ -5076,3 +5077,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)
return prev;
}
+
+void
+PQregisterStoreHandler(PGconn *conn, StoreHandler func, void *param)
+{
+ conn->storeHandler = func;
+ conn->storeHandlerParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..96e5974 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -67,6 +67,10 @@ static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
+static void *pqDefaultStoreHandler(PGresult *res, PQStoreFunc func,
+ int id, size_t len);
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup);
+
/* ----------------
* Space management for PGresult.
@@ -160,6 +164,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
result->curBlock = NULL;
result->curOffset = 0;
result->spaceLeft = 0;
+ result->storeHandler = pqDefaultStoreHandler;
+ result->storeHandlerParam = NULL;
+ result->storeHandlerErrMes = NULL;
if (conn)
{
@@ -194,6 +201,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
}
result->nEvents = conn->nEvents;
}
+
+ if (conn->storeHandler)
+ {
+ result->storeHandler = conn->storeHandler;
+ result->storeHandlerParam = conn->storeHandlerParam;
+ }
}
else
{
@@ -487,6 +500,33 @@ PQresultAlloc(PGresult *res, size_t nBytes)
return pqResultAlloc(res, nBytes, TRUE);
}
+void *
+pqDefaultStoreHandler(PGresult *res, PQStoreFunc func, int id, size_t len)
+{
+ void *p;
+
+ switch (func)
+ {
+ case PQSF_ALLOC_TEXT:
+ return pqResultAlloc(res, len, TRUE);
+
+ case PQSF_ALLOC_BINARY:
+ p = pqResultAlloc(res, len, FALSE);
+
+ if (id == -1)
+ res->storeHandlerParam = p;
+
+ return p;
+
+ case PQSF_ADD_TUPLE:
+ return pqAddTuple(res, res->storeHandlerParam);
+
+ default:
+ /* Ignore */
+ break;
+ }
+ return NULL;
+}
/*
* pqResultAlloc -
* Allocate subsidiary storage for a PGresult.
@@ -830,9 +870,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
/*
* pqAddTuple
* add a row pointer to the PGresult structure, growing it if necessary
- * Returns TRUE if OK, FALSE if not enough memory to add the row
+ * Returns tup if OK, NULL if not enough memory to add the row.
*/
-int
+static void *
pqAddTuple(PGresult *res, PGresAttValue *tup)
{
if (res->ntups >= res->tupArrSize)
@@ -858,13 +898,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)
newTuples = (PGresAttValue **)
realloc(res->tuples, newSize * sizeof(PGresAttValue *));
if (!newTuples)
- return FALSE; /* malloc or realloc failed */
+ return NULL; /* malloc or realloc failed */
res->tupArrSize = newSize;
res->tuples = newTuples;
}
res->tuples[res->ntups] = tup;
res->ntups++;
- return TRUE;
+ return tup;
}
/*
@@ -2822,6 +2862,35 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)
return 0;
}
+/* PQgetAddStoreHandlerParam
+ * Get the pointer to the contextual parameter from PGresult which is
+ * registered to PGconn by PQregisterStoreHandler
+ */
+void *
+PQgetStoreHandlerParam(const PGresult *res)
+{
+ if (!res)
+ return NULL;
+ return res->storeHandlerParam;
+}
+
+/* PQsetStorHandlerErrMes
+ * Set the error message pass back to the caller of StoreHandler.
+ *
+ * mes must be a malloc'ed memory block and it will be released by
+ * the caller of StoreHandler. You can replace the previous message
+ * by alternative mes, or clear it with NULL. The previous one will
+ * be freed internally.
+ */
+void
+PQsetStoreHandlerErrMes(PGresult *res, char *mes)
+{
+ /* Free existing message */
+ if (res->storeHandlerErrMes)
+ free(res->storeHandlerErrMes);
+ res->storeHandlerErrMes = mes;
+}
+
/* PQnparams:
* returns the number of input parameters of a prepared statement.
*/
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..205502b 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary)
if (conn->curTuple == NULL)
{
conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+ result->storeHandler(result, PQSF_ALLOC_BINARY, -1,
+ nfields * sizeof(PGresAttValue));
if (conn->curTuple == NULL)
- goto outOfMemory;
+ goto addTupleError;
MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
/*
@@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary)
{
bitmap = (char *) malloc(nbytes);
if (!bitmap)
- goto outOfMemory;
+ goto addTupleError;
}
if (pqGetnchar(bitmap, nbytes, conn))
@@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary)
vlen = 0;
if (tup[i].value == NULL)
{
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
+ PQStoreFunc func =
+ (binary ? PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT);
+ tup[i].value =
+ (char *) result->storeHandler(result, func, i, vlen + 1);
if (tup[i].value == NULL)
- goto outOfMemory;
+ goto addTupleError;
}
tup[i].len = vlen;
/* read in the value */
@@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary)
}
/* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
+ if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0))
+ goto addTupleError;
+
/* and reset for a new message */
conn->curTuple = NULL;
@@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary)
free(bitmap);
return 0;
-outOfMemory:
+addTupleError:
/* Replace partially constructed result with an error result */
/*
@@ -829,8 +834,21 @@ outOfMemory:
* there's not enough memory to concatenate messages...
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
+ resetPQExpBuffer(&conn->errorMessage);
+
+ /*
+ * If error message is passed from addTupleFunc, set it into
+ * PGconn, assume out of memory if not.
+ */
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext(result->storeHandlerErrMes ?
+ result->storeHandlerErrMes :
+ "out of memory for query result\n"));
+ if (result->storeHandlerErrMes)
+ {
+ free(result->storeHandlerErrMes);
+ result->storeHandlerErrMes = NULL;
+ }
/*
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..117c38a 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength)
if (conn->curTuple == NULL)
{
conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+ result->storeHandler(result, PQSF_ALLOC_BINARY, -1,
+ nfields * sizeof(PGresAttValue));
if (conn->curTuple == NULL)
- goto outOfMemory;
+ goto addTupleError;
MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
}
tup = conn->curTuple;
@@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength)
vlen = 0;
if (tup[i].value == NULL)
{
- bool isbinary = (result->attDescs[i].format != 0);
-
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
+ PQStoreFunc func = (result->attDescs[i].format != 0 ?
+ PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT);
+ tup[i].value =
+ (char *) result->storeHandler(result, func, i, vlen + 1);
if (tup[i].value == NULL)
- goto outOfMemory;
+ goto addTupleError;
}
tup[i].len = vlen;
/* read in the value */
@@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength)
}
/* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
+ if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0))
+ goto addTupleError;
+
/* and reset for a new message */
conn->curTuple = NULL;
return 0;
-outOfMemory:
+addTupleError:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
+ resetPQExpBuffer(&conn->errorMessage);
+
+ /*
+ * If error message is passed from addTupleFunc, set it into
+ * PGconn, assume out of memory if not.
+ */
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext(result->storeHandlerErrMes ?
+ result->storeHandlerErrMes :
+ "out of memory for query result\n"));
+ if (result->storeHandlerErrMes)
+ {
+ free(result->storeHandlerErrMes);
+ result->storeHandlerErrMes = NULL;
+ }
pqSaveErrorResult(conn);
/* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..6d86fa0 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -116,6 +116,16 @@ typedef enum
PQPING_NO_ATTEMPT /* connection not attempted (bad params) */
} PGPing;
+/* PQStoreFunc is the enum for one of the parameters of storeHandler
+ * that decides what to do. See the typedef StoreHandler for
+ * details */
+typedef enum
+{
+ PQSF_ALLOC_TEXT, /* Requested non-aligned memory for text value */
+ PQSF_ALLOC_BINARY, /* Requested aligned memory for binary value */
+ PQSF_ADD_TUPLE /* Requested to add tuple data into store */
+} PQStoreFunc;
+
/* PGconn encapsulates a connection to the backend.
* The contents of this struct are not supposed to be known to applications.
*/
@@ -149,6 +159,15 @@ typedef struct pgNotify
struct pgNotify *next; /* list link */
} PGnotify;
+/* PGresAttValue represents a value of one tuple field in string form.
+ NULL is represented as len < 0. Otherwise value points to a null
+ terminated C string with the length of len. */
+typedef struct pgresAttValue
+{
+ int len; /* length in bytes of the value */
+ char *value; /* actual value, plus terminating zero byte */
+} PGresAttValue;
+
/* Function types for notice-handling callbacks */
typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
typedef void (*PQnoticeProcessor) (void *arg, const char *message);
@@ -416,6 +435,52 @@ extern PGPing PQping(const char *conninfo);
extern PGPing PQpingParams(const char *const * keywords,
const char *const * values, int expand_dbname);
+/*
+ * Typedef for alternative result store handler.
+ *
+ * This function pointer is used for alternative result store handler
+ * callback in PGresult and PGconn.
+ *
+ * StoreHandler is called for three functions designated by the enum
+ * PQStoreFunc.
+ *
+ * id is the identifier for allocated memory block. The caller sets -1
+ * for PGresAttValue array, and 0 to number of cols - 1 for each
+ * column.
+ *
+ * PQSF_ALLOC_TEXT requests the size bytes memory block for a text
+ * value which may not be alingned to the word boundary.
+ *
+ * PQSF_ALLOC_BINARY requests the size bytes memory block for a binary
+ * value which is aligned to the word boundary.
+ *
+ * PQSF_ADD_TUPLE requests to add tuple data into the result store,
+ * and free the memory blocks allocated by this function if necessary.
+ * id and size are to be ignored for this function.
+ *
+ * This function must return non-NULL value for success and must
+ * return NULL for failure and may set error message by
+ * PQsetStoreHandlerErrMes. It is assumed by caller as out of memory
+ * when the error message is NULL on failure. This function is assumed
+ * not to throw any exception.
+ */
+typedef void *(*StoreHandler)(PGresult *res, PQStoreFunc func,
+ int id, size_t size);
+
+/*
+ * Register alternative result store function to PGconn.
+ *
+ * By registering this function, pg_result disables its own result
+ * store and calls it to append rows one by one.
+ *
+ * func is tuple store function. See the typedef StoreHandler.
+ *
+ * storeHandlerParam is the contextual variable that can be get with
+ * PQgetStoreHandlerParam in StoreHandler.
+ */
+extern void PQregisterStoreHandler(PGconn *conn, StoreHandler func,
+ void *storeHandlerParam);
+
/* Force the write buffer to be written (or at least try) */
extern int PQflush(PGconn *conn);
@@ -454,6 +519,8 @@ extern char *PQcmdTuples(PGresult *res);
extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
extern int PQgetlength(const PGresult *res, int tup_num, int field_num);
extern int PQgetisnull(const PGresult *res, int tup_num, int field_num);
+extern void *PQgetStoreHandlerParam(const PGresult *res);
+extern void PQsetStoreHandlerErrMes(PGresult *res, char *mes);
extern int PQnparams(const PGresult *res);
extern Oid PQparamtype(const PGresult *res, int param_num);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d967d60..e28e712 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,12 +134,6 @@ typedef struct pgresParamDesc
#define NULL_LEN (-1) /* pg_result len for NULL value */
-typedef struct pgresAttValue
-{
- int len; /* length in bytes of the value */
- char *value; /* actual value, plus terminating zero byte */
-} PGresAttValue;
-
/* Typedef for message-field list entries */
typedef struct pgMessageField
{
@@ -209,6 +203,11 @@ struct pg_result
PGresult_data *curBlock; /* most recently allocated block */
int curOffset; /* start offset of free space in block */
int spaceLeft; /* number of free bytes remaining in block */
+
+ StoreHandler storeHandler; /* Result store handler. See
+ * StoreHandler for details. */
+ void *storeHandlerParam; /* Contextual parameter for storeHandler */
+ char *storeHandlerErrMes; /* Error message from storeHandler */
};
/* PGAsyncStatusType defines the state of the query-execution state machine */
@@ -443,6 +442,13 @@ struct pg_conn
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
+
+ /* Tuple store handler. The two fields below is copied to newly
+ * created PGresult if tupStoreHandler is not NULL. Use default
+ * function if NULL. */
+ StoreHandler storeHandler; /* Result store handler. See
+ * StoreHandler for details. */
+ void *storeHandlerParam; /* Contextual parameter for storeHandler */
};
/* PGcancel stores all data necessary to cancel a connection. A copy of this
@@ -507,7 +513,6 @@ extern void
pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
/* This lets gcc check the format string for consistency. */
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
-extern int pqAddTuple(PGresult *res, PGresAttValue *tup);
extern void pqSaveMessageField(PGresult *res, char code,
const char *value);
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..a8685a9 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,24 @@ typedef struct remoteConn
bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
+typedef struct storeInfo
+{
+ Tuplestorestate *tuplestore;
+ int nattrs;
+ AttInMetadata *attinmeta;
+ MemoryContext oldcontext;
+ char *attrvalbuf;
+ void **valbuf;
+ size_t *valbufsize;
+ bool error_occurred;
+ bool nummismatch;
+ ErrorData *edata;
+} storeInfo;
+
/*
* Internal declarations
*/
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
@@ -90,6 +103,10 @@ static char *escape_param_str(const char *from);
static void validate_pkattnums(Relation rel,
int2vector *pkattnums_arg, int32 pknumatts_arg,
int **pkattnums, int *pknumatts);
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *storeHandler(PGresult *res, PQStoreFunc func, int id, size_t size);
+
/* Global */
static remoteConn *pconn = NULL;
@@ -503,6 +520,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
char *curname = NULL;
int howmany = 0;
bool fail = true; /* default to backward compatible */
+ storeInfo storeinfo;
DBLINK_INIT;
@@ -559,15 +577,36 @@ dblink_fetch(PG_FUNCTION_ARGS)
appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQregisterStoreHandler(conn, storeHandler, &storeinfo);
+
+ /*
* Try to execute the query. Note that since libpq uses malloc, the
* PGresult will be long-lived even though we are still in a short-lived
* memory context.
*/
res = PQexec(conn, buf.data);
+ finishStoreInfo(&storeinfo);
+
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
+ /* finishStoreInfo saves the fields referred to below. */
+ if (storeinfo.nummismatch)
+ {
+ /* This is only for backward compatibility */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
+ else if (storeinfo.edata)
+ ReThrowError(storeinfo.edata);
+
dblink_res_error(conname, res, "could not fetch from cursor", fail);
return (Datum) 0;
}
@@ -579,8 +618,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));
}
+ PQclear(res);
- materializeResult(fcinfo, res);
return (Datum) 0;
}
@@ -640,6 +679,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible */
bool freeconn = false;
+ storeInfo storeinfo;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -715,164 +755,213 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
rsinfo->setResult = NULL;
rsinfo->setDesc = NULL;
+
+ /*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec/PQgetResult below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQregisterStoreHandler(conn, storeHandler, &storeinfo);
+
/* synchronous query, or async result retrieval */
if (!is_async)
res = PQexec(conn, sql);
else
- {
res = PQgetResult(conn);
- /* NULL means we're all done with the async results */
- if (!res)
- return (Datum) 0;
- }
- /* if needed, close the connection to the database and cleanup */
- if (freeconn)
- PQfinish(conn);
+ finishStoreInfo(&storeinfo);
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
+ /* NULL res from async get means we're all done with the results */
+ if (res || !is_async)
{
- dblink_res_error(conname, res, "could not execute query", fail);
- return (Datum) 0;
+ if (freeconn)
+ PQfinish(conn);
+
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ /* finishStoreInfo saves the fields referred to below. */
+ if (storeinfo.nummismatch)
+ {
+ /* This is only for backward compatibility */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
+ else if (storeinfo.edata)
+ ReThrowError(storeinfo.edata);
+
+ dblink_res_error(conname, res, "could not execute query", fail);
+ return (Datum) 0;
+ }
}
+ PQclear(res);
- materializeResult(fcinfo, res);
return (Datum) 0;
}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */
static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-
- Assert(rsinfo->returnMode == SFRM_Materialize);
-
- PG_TRY();
+ TupleDesc tupdesc;
+ int i;
+
+ switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
- TupleDesc tupdesc;
- bool is_sql_cmd = false;
- int ntuples;
- int nfields;
+ case TYPEFUNC_COMPOSITE:
+ /* success */
+ break;
+ case TYPEFUNC_RECORD:
+ /* failed to determine actual type of RECORD */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record")));
+ break;
+ default:
+ /* result type isn't composite */
+ elog(ERROR, "return type must be a row type");
+ break;
+ }
+
+ sinfo->oldcontext = MemoryContextSwitchTo(
+ rsinfo->econtext->ecxt_per_query_memory);
+
+ /* make sure we have a persistent copy of the tupdesc */
+ tupdesc = CreateTupleDescCopy(tupdesc);
+
+ sinfo->error_occurred = FALSE;
+ sinfo->nummismatch = FALSE;
+ sinfo->edata = NULL;
+ sinfo->nattrs = tupdesc->natts;
+ sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+ sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *));
+ sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t));
+ for (i = 0 ; i < sinfo->nattrs ; i++)
+ {
+ sinfo->valbuf[i] = NULL;
+ sinfo->valbufsize[i] = 0;
+ }
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- is_sql_cmd = true;
-
- /*
- * need a tuple descriptor representing one TEXT column to return
- * the command status string as our result tuple
- */
- tupdesc = CreateTemplateTupleDesc(1, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
- TEXTOID, -1, 0);
- ntuples = 1;
- nfields = 1;
- }
- else
- {
- Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+ /* Preallocate memory of same size with PGresAttDesc array for values. */
+ sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue));
- is_sql_cmd = false;
+ rsinfo->setResult = sinfo->tuplestore;
+ rsinfo->setDesc = tupdesc;
+}
- /* get a tuple descriptor for our result type */
- switch (get_call_result_type(fcinfo, NULL, &tupdesc))
- {
- case TYPEFUNC_COMPOSITE:
- /* success */
- break;
- case TYPEFUNC_RECORD:
- /* failed to determine actual type of RECORD */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("function returning record called in context "
- "that cannot accept type record")));
- break;
- default:
- /* result type isn't composite */
- elog(ERROR, "return type must be a row type");
- break;
- }
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+ int i;
- /* make sure we have a persistent copy of the tupdesc */
- tupdesc = CreateTupleDescCopy(tupdesc);
- ntuples = PQntuples(res);
- nfields = PQnfields(res);
+ for (i = 0 ; i < sinfo->nattrs ; i++)
+ {
+ if (sinfo->valbuf[i])
+ {
+ free(sinfo->valbuf[i]);
+ sinfo->valbuf[i] = NULL;
}
+ }
+ if (sinfo->attrvalbuf)
+ free(sinfo->attrvalbuf);
+ sinfo->attrvalbuf = NULL;
+ MemoryContextSwitchTo(sinfo->oldcontext);
+}
- /*
- * check result and tuple descriptor have the same number of columns
- */
- if (nfields != tupdesc->natts)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("remote query result rowtype does not match "
- "the specified FROM clause rowtype")));
+static void *
+storeHandler(PGresult *res, PQStoreFunc func, int id, size_t size)
+{
+ storeInfo *sinfo = (storeInfo *)PQgetStoreHandlerParam(res);
+ HeapTuple tuple;
+ int fields = PQnfields(res);
+ int i;
+ PGresAttValue *attval;
+ char **cstrs;
- if (ntuples > 0)
- {
- AttInMetadata *attinmeta;
- Tuplestorestate *tupstore;
- MemoryContext oldcontext;
- int row;
- char **values;
-
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
- oldcontext = MemoryContextSwitchTo(
- rsinfo->econtext->ecxt_per_query_memory);
- tupstore = tuplestore_begin_heap(true, false, work_mem);
- rsinfo->setResult = tupstore;
- rsinfo->setDesc = tupdesc;
- MemoryContextSwitchTo(oldcontext);
+ if (sinfo->error_occurred)
+ return NULL;
+
+ switch (func)
+ {
+ case PQSF_ALLOC_TEXT:
+ case PQSF_ALLOC_BINARY:
+ if (id == -1)
+ return sinfo->attrvalbuf;
- values = (char **) palloc(nfields * sizeof(char *));
+ if (id < 0 || id >= sinfo->nattrs)
+ return NULL;
- /* put all tuples into the tuplestore */
- for (row = 0; row < ntuples; row++)
+ if (sinfo->valbufsize[id] < size)
{
- HeapTuple tuple;
+ if (sinfo->valbuf[id] == NULL)
+ sinfo->valbuf[id] = malloc(size);
+ else
+ sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size);
+ sinfo->valbufsize[id] = size;
+ }
+ return sinfo->valbuf[id];
- if (!is_sql_cmd)
- {
- int i;
+ case PQSF_ADD_TUPLE:
+ break; /* Go through */
+ default:
+ /* Ignore */
+ break;
+ }
- for (i = 0; i < nfields; i++)
- {
- if (PQgetisnull(res, row, i))
- values[i] = NULL;
- else
- values[i] = PQgetvalue(res, row, i);
- }
- }
- else
- {
- values[0] = PQcmdStatus(res);
- }
+ if (sinfo->nattrs != fields)
+ {
+ sinfo->error_occurred = TRUE;
+ sinfo->nummismatch = TRUE;
+ finishStoreInfo(sinfo);
- /* build the tuple and put it into the tuplestore. */
- tuple = BuildTupleFromCStrings(attinmeta, values);
- tuplestore_puttuple(tupstore, tuple);
- }
+ /* This error will be processed in
+ * dblink_record_internal(). So do not set error message
+ * here. */
+ return NULL;
+ }
- /* clean up and return the tuplestore */
- tuplestore_donestoring(tupstore);
- }
+ /*
+ * Rewrite PGresAttValue[] to char(*)[] in-place.
+ */
+ Assert(sizeof(char*) <= sizeof(PGresAttValue));
- PQclear(res);
+ attval = (PGresAttValue *)sinfo->attrvalbuf;
+ cstrs = (char **)sinfo->attrvalbuf;
+ for(i = 0 ; i < fields ; i++)
+ {
+ if (attval->len < 0)
+ cstrs[i] = NULL;
+ else
+ cstrs[i] = attval->value;
+ }
+
+ PG_TRY();
+ {
+ tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+ tuplestore_puttuple(sinfo->tuplestore, tuple);
}
PG_CATCH();
{
- /* be sure to release the libpq result */
- PQclear(res);
- PG_RE_THROW();
+ MemoryContext context;
+ /*
+ * Store exception for later ReThrow and cancel the exception.
+ */
+ sinfo->error_occurred = TRUE;
+ context = MemoryContextSwitchTo(sinfo->oldcontext);
+ sinfo->edata = CopyErrorData();
+ MemoryContextSwitchTo(context);
+ FlushErrorState();
+
+ return NULL;
}
PG_END_TRY();
+
+ return sinfo->attrvalbuf;
}
/*
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..8803999 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,293 @@ int PQisthreadsafe();
</sect1>
+ <sect1 id="libpq-alterstorage">
+ <title>Alternative result storage</title>
+
+ <indexterm zone="libpq-alterstorage">
+ <primary>PGresult</primary>
+ <secondary>PGconn</secondary>
+ </indexterm>
+
+ <para>
+ As the standard usage, users can get the result of command
+ execution from <structname>PGresult</structname> aquired
+ with <function>PGgetResult</function>
+ from <structname>PGConn</structname>. While the memory areas for
+ the PGresult are allocated with malloc() internally within calls of
+ command execution functions such as <function>PQexec</function>
+ and <function>PQgetResult</function>. If you have difficulties to
+ handle the result records in the form of PGresult, you can instruct
+ PGconn to store them into your own storage instead of PGresult.
+ </para>
+
+ <variablelist>
+ <varlistentry id="libpq-registerstorehandler">
+ <term>
+ <function>PQregisterStoreHandler</function>
+ <indexterm>
+ <primary>PQregisterStoreHandler</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Sets a callback function to allocate memory for each tuple and
+ column values, and add the complete tuple into the alternative
+ result storage.
+<synopsis>
+void PQregisterStoreHandler(PGconn *conn,
+ StoreHandler func,
+ void *param);
+</synopsis>
+ </para>
+
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>conn</parameter></term>
+ <listitem>
+ <para>
+ The connection object to set the storage handler
+ function. PGresult created from this connection calls this
+ function to store the result instead of storing into its
+ internal storage.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>func</parameter></term>
+ <listitem>
+ <para>
+ Storage handler function to set. NULL means to use the
+ default storage.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>param</parameter></term>
+ <listitem>
+ <para>
+ A pointer to contextual parameter passed
+ to <parameter>func</parameter>. You can get this poiner
+ in <type>StoreHandler</type>
+ by <function>PQgetStoreHandlerParam</function>.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-storehandler">
+ <term>
+ <type>Storehandler</type>
+ <indexterm>
+ <primary>StoreHandler</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ The type for the storage handler callback function.
+<synopsis>
+typedef enum
+{
+ PQSF_ALLOC_TEXT,
+ PQSF_ALLOC_BINARY,
+ PQSF_ADD_TUPLE
+} PQStoreFunc;
+
+void *(*StoreHandler)(PGresult *res,
+ PQStoreFunc func,
+ int id,
+ size_t size);
+</synopsis>
+ </para>
+
+ <para>
+ Generally this function must return NULL for failure and should
+ set the error message
+ with <function>PGsetStoreHandlerErrMes</function> if the cause
+ is other than out of memory. This funcion must not throw any
+ exception. This function is called in the sequence following.
+
+ <itemizedlist spacing="compact">
+ <listitem>
+ <simpara>Call with <parameter>func</parameter>
+ = <firstterm>PQSF_ALLOC_BINARY</firstterm>
+ and <parameter>id</parameter> = -1 to request the memory
+ for a tuple to be used as an array
+ of <type>PGresAttValue</type>. </simpara>
+ </listitem>
+ <listitem>
+ <simpara>Call with <parameter>func</parameter>
+ = <firstterm>PQSF_ALLOC_TEXT</firstterm>
+ or <firstterm>PQSF_ALLOC_BINARY</firstterm>
+ and <parameter>id</parameter> is zero to the number of columns
+ - 1 to request the memory for each column value in current
+ tuple.</simpara>
+ </listitem>
+ <listitem>
+ <simpara>Call with <parameter>func</parameter>
+ = <firstterm>PQSF_ADD_TUPLE</firstterm> to request the
+ constructed tuple to be stored.</simpara>
+ </listitem>
+ </itemizedlist>
+ </para>
+ <para>
+ Calling <type>StoreHandler</type>
+ with <parameter>func</parameter> =
+ <firstterm>PQSF_ALLOC_TEXT</firstterm> is telling to return a
+ memory block with at least <parameter>size</parameter> bytes
+ which may not be aligned to the word boundary.
+ <parameter>id</parameter> is a zero or positive number
+ distinguishes the usage of requested memory block, that is the
+ position of the column for which the memory block is used.
+ </para>
+ <para>
+ When <parameter>func</parameter>
+ = <firstterm>PQSF_ALLOC_BINARY</firstterm>, this function is
+ telled to return a memory block with at
+ least <parameter>size</parameter> bytes which is aligned to the
+ word boundary.
+ <parameter>id</parameter> is the identifier distinguishes the
+ usage of requested memory block. -1 means that it is used as an
+ array of <type>PGresAttValue</type> to store the tuple. Zero or
+ positive numbers have the same meanings as for
+ <firstterm>PQSF_ALLOC_BINARY</firstterm>.
+ </para>
+ <para>When <parameter>func</parameter>
+ = <firstterm>PQSF_ADD_TUPLE</firstterm>, this function is
+ telled to store the <type>PGresAttValue</type> structure
+ constructed by the caller into your storage. The pointer to the
+ tuple structure is not passed so you should memorize the
+ pointer to the memory block passed back the caller on
+ <parameter>func</parameter>
+ = <parameter>PQSF_ALLOC_BINARY</parameter>
+ with <parameter>id</parameter> is -1. This function must return
+ any non-NULL values for success. You must properly put back the
+ memory blocks passed to the caller in this function if needed.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>func</parameter></term>
+ <listitem>
+ <para>
+ An <type>enum</type> value telling the function to perform.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>param</parameter></term>
+ <listitem>
+ <para>
+ A pointer to contextual parameter passed to func.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqgetstorehandlerparam">
+ <term>
+ <function>PQgetStoreHandlerParam</function>
+ <indexterm>
+ <primary>PQgetStoreHandlerParam</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Get the pointer passed to <function>PQregisterStoreHandler</function>
+ as <parameter>param</parameter>.
+<synopsis>
+void *PQgetStoreHandlerParam(PGresult *res)
+</synopsis>
+ </para>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqsetstorehandlererrmes">
+ <term>
+ <function>PQsetStoreHandlerErrMes</function>
+ <indexterm>
+ <primary>PQsetStoreHandlerErrMes</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Set the message for the error occurred
+ in <type>StoreHandler</type>. If this message is not set, the
+ caller assumes the error to be out of memory.
+<synopsis>
+void PQsetStoreHandlerErrMes(PGresult *res, char *mes)
+</synopsis>
+ </para>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object
+ passed to <type>StoreHandler</type>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>mes</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the memory block containing the error
+ message, which is allocated
+ by <function>malloc()</function>. The memory block
+ will be freed with <function>free()</function> in the
+ caller of
+ <type>StoreHandler</type> only if it returns NULL.
+ </para>
+ <para>
+ If <parameter>res</parameter> already has a message previously
+ set, it is freed and then the given message is set. Set NULL
+ to cancel the the costom message.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </sect1>
+
+
<sect1 id="libpq-build">
<title>Building <application>libpq</application> Programs</title>
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers