On Thu, Feb 02, 2012 at 04:51:37PM +0900, Kyotaro HORIGUCHI wrote:
> Hello, This is new version of dblink.c
>
> - Memory is properly freed when realloc returns NULL in storeHandler().
>
> - The bug that free() in finishStoreInfo() will be fed with
> garbage pointer when malloc for sinfo->valbuflen fails is
> fixed.
Thanks, merged. I also did some minor coding style cleanups.
Tagging it Ready For Committer. I don't see any notable
issues with the patch anymore.
There is some potential for experimenting with more aggressive
optimizations on dblink side, but I'd like to get a nod from
a committer for libpq changes first.
--
marko
*** a/src/interfaces/libpq/exports.txt
--- b/src/interfaces/libpq/exports.txt
***************
*** 160,162 **** PQconnectStartParams 157
--- 160,164 ----
PQping 158
PQpingParams 159
PQlibVersion 160
+ PQsetRowProcessor 161
+ PQsetRowProcessorErrMsg 162
*** a/src/interfaces/libpq/fe-connect.c
--- b/src/interfaces/libpq/fe-connect.c
***************
*** 2693,2698 **** makeEmptyPGconn(void)
--- 2693,2701 ----
conn->wait_ssl_try = false;
#endif
+ /* set default row processor */
+ PQsetRowProcessor(conn, NULL, NULL);
+
/*
* We try to send at least 8K at a time, which is the usual size of pipe
* buffers on Unix systems. That way, when we are sending a large amount
***************
*** 2711,2718 **** makeEmptyPGconn(void)
--- 2714,2726 ----
initPQExpBuffer(&conn->errorMessage);
initPQExpBuffer(&conn->workBuffer);
+ /* set up initial row buffer */
+ conn->rowBufLen = 32;
+ conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue));
+
if (conn->inBuffer == NULL ||
conn->outBuffer == NULL ||
+ conn->rowBuf == NULL ||
PQExpBufferBroken(&conn->errorMessage) ||
PQExpBufferBroken(&conn->workBuffer))
{
***************
*** 2812,2817 **** freePGconn(PGconn *conn)
--- 2820,2827 ----
free(conn->inBuffer);
if (conn->outBuffer)
free(conn->outBuffer);
+ if (conn->rowBuf)
+ free(conn->rowBuf);
termPQExpBuffer(&conn->errorMessage);
termPQExpBuffer(&conn->workBuffer);
***************
*** 5076,5078 **** PQregisterThreadLock(pgthreadlock_t newhandler)
--- 5086,5089 ----
return prev;
}
+
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 66,71 **** static PGresult *PQexecFinish(PGconn *conn);
--- 66,72 ----
static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
+ static int pqAddRow(PGresult *res, void *param, PGrowValue *columns);
/* ----------------
***************
*** 160,165 **** PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
--- 161,167 ----
result->curBlock = NULL;
result->curOffset = 0;
result->spaceLeft = 0;
+ result->rowProcessorErrMsg = NULL;
if (conn)
{
***************
*** 701,707 **** pqClearAsyncResult(PGconn *conn)
if (conn->result)
PQclear(conn->result);
conn->result = NULL;
- conn->curTuple = NULL;
}
/*
--- 703,708 ----
***************
*** 756,762 **** pqPrepareAsyncResult(PGconn *conn)
*/
res = conn->result;
conn->result = NULL; /* handing over ownership to caller */
- conn->curTuple = NULL; /* just in case */
if (!res)
res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
else
--- 757,762 ----
***************
*** 828,833 **** pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
--- 828,900 ----
}
/*
+ * PQsetRowProcessor
+ * Set function that copies column data out from network buffer.
+ */
+ void
+ PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+ {
+ conn->rowProcessor = (func ? func : pqAddRow);
+ conn->rowProcessorParam = param;
+ }
+
+ /*
+ * PQsetRowProcessorErrMsg
+ * Set the error message pass back to the caller of RowProcessor.
+ *
+ * You can replace the previous message by alternative mes, or clear
+ * it with NULL.
+ */
+ void
+ PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+ {
+ if (msg)
+ res->rowProcessorErrMsg = pqResultStrdup(res, msg);
+ else
+ res->rowProcessorErrMsg = NULL;
+ }
+
+ /*
+ * pqAddRow
+ * add a row to the PGresult structure, growing it if necessary
+ * Returns TRUE if OK, FALSE if not enough memory to add the row.
+ */
+ static int
+ pqAddRow(PGresult *res, void *param, PGrowValue *columns)
+ {
+ PGresAttValue *tup;
+ int nfields = res->numAttributes;
+ int i;
+
+ tup = (PGresAttValue *)
+ pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+ if (tup == NULL)
+ return FALSE;
+
+ for (i = 0 ; i < nfields ; i++)
+ {
+ tup[i].len = columns[i].len;
+ if (tup[i].len == NULL_LEN)
+ {
+ tup[i].value = res->null_field;
+ }
+ else
+ {
+ bool isbinary = (res->attDescs[i].format != 0);
+ tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+ if (tup[i].value == NULL)
+ return FALSE;
+
+ memcpy(tup[i].value, columns[i].value, tup[i].len);
+ /* We have to terminate this ourselves */
+ tup[i].value[tup[i].len] = '\0';
+ }
+ }
+
+ return pqAddTuple(res, tup);
+ }
+
+ /*
* pqAddTuple
* add a row pointer to the PGresult structure, growing it if necessary
* Returns TRUE if OK, FALSE if not enough memory to add the row
***************
*** 1223,1229 **** PQsendQueryStart(PGconn *conn)
/* initialize async result-accumulation state */
conn->result = NULL;
- conn->curTuple = NULL;
/* ready to send command message */
return true;
--- 1290,1295 ----
*** a/src/interfaces/libpq/fe-misc.c
--- b/src/interfaces/libpq/fe-misc.c
***************
*** 219,224 **** pqGetnchar(char *s, size_t len, PGconn *conn)
--- 219,243 ----
}
/*
+ * pqGetnchar:
+ * skip len bytes in input buffer.
+ */
+ int
+ pqSkipnchar(size_t len, PGconn *conn)
+ {
+ if (len > (size_t) (conn->inEnd - conn->inCursor))
+ return EOF;
+
+ conn->inCursor += len;
+
+ if (conn->Pfdebug)
+ fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+ (unsigned long) len);
+
+ return 0;
+ }
+
+ /*
* pqPutnchar:
* write exactly len bytes to the current message
*/
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
***************
*** 703,721 **** failure:
/*
* parseInput subroutine to read a 'B' or 'D' (row data) message.
! * We add another tuple to the existing PGresult structure.
* Returns: 0 if completed message, EOF if error or not enough data yet.
*
* Note that if we run out of data, we have to suspend and reprocess
! * the message after more data is received. We keep a partially constructed
! * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
*/
static int
getAnotherTuple(PGconn *conn, bool binary)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
! PGresAttValue *tup;
/* the backend sends us a bitmap of which attributes are null */
char std_bitmap[64]; /* used unless it doesn't fit */
--- 703,720 ----
/*
* parseInput subroutine to read a 'B' or 'D' (row data) message.
! * It fills rowbuf with column pointers and then calls row processor.
* Returns: 0 if completed message, EOF if error or not enough data yet.
*
* Note that if we run out of data, we have to suspend and reprocess
! * the message after more data is received.
*/
static int
getAnotherTuple(PGconn *conn, bool binary)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
! PGrowValue *rowbuf;
/* the backend sends us a bitmap of which attributes are null */
char std_bitmap[64]; /* used unless it doesn't fit */
***************
*** 727,754 **** getAnotherTuple(PGconn *conn, bool binary)
int bitcnt; /* number of bits examined in current byte */
int vlen; /* length of the current field value */
result->binary = binary;
! /* Allocate tuple space if first time for this data message */
! if (conn->curTuple == NULL)
{
! conn->curTuple = (PGresAttValue *)
! pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
! if (conn->curTuple == NULL)
! goto outOfMemory;
! MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
!
! /*
! * If it's binary, fix the column format indicators. We assume the
! * backend will consistently send either B or D, not a mix.
! */
! if (binary)
! {
! for (i = 0; i < nfields; i++)
! result->attDescs[i].format = 1;
! }
}
- tup = conn->curTuple;
/* Get the null-value bitmap */
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
--- 726,752 ----
int bitcnt; /* number of bits examined in current byte */
int vlen; /* length of the current field value */
+ /* resize row buffer if needed */
+ if (nfields > conn->rowBufLen)
+ {
+ rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ if (!rowbuf)
+ goto rowProcessError;
+ conn->rowBuf = rowbuf;
+ conn->rowBufLen = nfields;
+ }
+ else
+ {
+ rowbuf = conn->rowBuf;
+ }
+
result->binary = binary;
! if (binary)
{
! for (i = 0; i < nfields; i++)
! result->attDescs[i].format = 1;
}
/* Get the null-value bitmap */
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
***************
*** 757,763 **** getAnotherTuple(PGconn *conn, bool binary)
{
bitmap = (char *) malloc(nbytes);
if (!bitmap)
! goto outOfMemory;
}
if (pqGetnchar(bitmap, nbytes, conn))
--- 755,761 ----
{
bitmap = (char *) malloc(nbytes);
if (!bitmap)
! goto rowProcessError;
}
if (pqGetnchar(bitmap, nbytes, conn))
***************
*** 771,804 **** getAnotherTuple(PGconn *conn, bool binary)
for (i = 0; i < nfields; i++)
{
if (!(bmap & 0200))
! {
! /* if the field value is absent, make it a null string */
! tup[i].value = result->null_field;
! tup[i].len = NULL_LEN;
! }
else
{
- /* get the value length (the first four bytes are for length) */
- if (pqGetInt(&vlen, 4, conn))
- goto EOFexit;
if (!binary)
vlen = vlen - 4;
if (vlen < 0)
vlen = 0;
- if (tup[i].value == NULL)
- {
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
- if (tup[i].value == NULL)
- goto outOfMemory;
- }
- tup[i].len = vlen;
- /* read in the value */
- if (vlen > 0)
- if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- goto EOFexit;
- /* we have to terminate this ourselves */
- tup[i].value[vlen] = '\0';
}
/* advance the bitmap stuff */
bitcnt++;
if (bitcnt == BITS_PER_BYTE)
--- 769,797 ----
for (i = 0; i < nfields; i++)
{
if (!(bmap & 0200))
! vlen = NULL_LEN;
! else if (pqGetInt(&vlen, 4, conn))
! goto EOFexit;
else
{
if (!binary)
vlen = vlen - 4;
if (vlen < 0)
vlen = 0;
}
+
+ /*
+ * rowbuf[i].value always points to the next address of the
+ * length field even if the value is NULL, to allow safe
+ * size estimates and data copy.
+ */
+ rowbuf[i].value = conn->inBuffer + conn->inCursor;
+ rowbuf[i].len = vlen;
+
+ /* Skip the value */
+ if (vlen > 0 && pqSkipnchar(vlen, conn))
+ goto EOFexit;
+
/* advance the bitmap stuff */
bitcnt++;
if (bitcnt == BITS_PER_BYTE)
***************
*** 811,827 **** getAnotherTuple(PGconn *conn, bool binary)
bmap <<= 1;
}
! /* Success! Store the completed tuple in the result */
! if (!pqAddTuple(result, tup))
! goto outOfMemory;
! /* and reset for a new message */
! conn->curTuple = NULL;
if (bitmap != std_bitmap)
free(bitmap);
return 0;
! outOfMemory:
/* Replace partially constructed result with an error result */
/*
--- 804,820 ----
bmap <<= 1;
}
! /* Success! Pass the completed row values to rowProcessor */
! if (!conn->rowProcessor(result, conn->rowProcessorParam, rowbuf))
! goto rowProcessError;
if (bitmap != std_bitmap)
free(bitmap);
+
return 0;
! rowProcessError:
!
/* Replace partially constructed result with an error result */
/*
***************
*** 829,838 **** outOfMemory:
* there's not enough memory to concatenate messages...
*/
pqClearAsyncResult(conn);
! printfPQExpBuffer(&conn->errorMessage,
! libpq_gettext("out of memory for query result\n"));
/*
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
* do to recover...
*/
--- 822,838 ----
* there's not enough memory to concatenate messages...
*/
pqClearAsyncResult(conn);
! resetPQExpBuffer(&conn->errorMessage);
/*
+ * If error message is passed from RowProcessor, set it into
+ * PGconn, assume out of memory if not.
+ */
+ appendPQExpBufferStr(&conn->errorMessage,
+ result->rowProcessorErrMsg ?
+ result->rowProcessorErrMsg :
+ libpq_gettext("out of memory for query result\n"));
+ /*
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
* do to recover...
*/
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
***************
*** 613,646 **** failure:
/*
* parseInput subroutine to read a 'D' (row data) message.
! * We add another tuple to the existing PGresult structure.
* Returns: 0 if completed message, EOF if error or not enough data yet.
*
* Note that if we run out of data, we have to suspend and reprocess
! * the message after more data is received. We keep a partially constructed
! * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
*/
static int
getAnotherTuple(PGconn *conn, int msgLength)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
! PGresAttValue *tup;
int tupnfields; /* # fields from tuple */
int vlen; /* length of the current field value */
int i;
- /* Allocate tuple space if first time for this data message */
- if (conn->curTuple == NULL)
- {
- conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- if (conn->curTuple == NULL)
- goto outOfMemory;
- MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
- }
- tup = conn->curTuple;
-
/* Get the field count and make sure it's what we expect */
if (pqGetInt(&tupnfields, 2, conn))
return EOF;
--- 613,634 ----
/*
* parseInput subroutine to read a 'D' (row data) message.
! * It fills rowbuf with column pointers and then calls row processor.
* Returns: 0 if completed message, EOF if error or not enough data yet.
*
* Note that if we run out of data, we have to suspend and reprocess
! * the message after more data is received.
*/
static int
getAnotherTuple(PGconn *conn, int msgLength)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
! PGrowValue *rowbuf;
int tupnfields; /* # fields from tuple */
int vlen; /* length of the current field value */
int i;
/* Get the field count and make sure it's what we expect */
if (pqGetInt(&tupnfields, 2, conn))
return EOF;
***************
*** 656,661 **** getAnotherTuple(PGconn *conn, int msgLength)
--- 644,663 ----
return 0;
}
+ /* resize row buffer if needed */
+ if (nfields > conn->rowBufLen)
+ {
+ rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ if (!rowbuf)
+ goto rowProcessError;
+ conn->rowBuf = rowbuf;
+ conn->rowBufLen = nfields;
+ }
+ else
+ {
+ rowbuf = conn->rowBuf;
+ }
+
/* Scan the fields */
for (i = 0; i < nfields; i++)
{
***************
*** 663,710 **** getAnotherTuple(PGconn *conn, int msgLength)
if (pqGetInt(&vlen, 4, conn))
return EOF;
if (vlen == -1)
! {
! /* null field */
! tup[i].value = result->null_field;
! tup[i].len = NULL_LEN;
! continue;
! }
! if (vlen < 0)
vlen = 0;
- if (tup[i].value == NULL)
- {
- bool isbinary = (result->attDescs[i].format != 0);
! tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
! if (tup[i].value == NULL)
! goto outOfMemory;
! }
! tup[i].len = vlen;
! /* read in the value */
! if (vlen > 0)
! if (pqGetnchar((char *) (tup[i].value), vlen, conn))
! return EOF;
! /* we have to terminate this ourselves */
! tup[i].value[vlen] = '\0';
}
! /* Success! Store the completed tuple in the result */
! if (!pqAddTuple(result, tup))
! goto outOfMemory;
! /* and reset for a new message */
! conn->curTuple = NULL;
return 0;
! outOfMemory:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
! printfPQExpBuffer(&conn->errorMessage,
! libpq_gettext("out of memory for query result\n"));
pqSaveErrorResult(conn);
/* Discard the failed message by pretending we read it */
--- 665,710 ----
if (pqGetInt(&vlen, 4, conn))
return EOF;
if (vlen == -1)
! vlen = NULL_LEN;
! else if (vlen < 0)
vlen = 0;
! /*
! * rowbuf[i].value always points to the next address of the
! * length field even if the value is NULL, to allow safe
! * size estimates and data copy.
! */
! rowbuf[i].value = conn->inBuffer + conn->inCursor;
! rowbuf[i].len = vlen;
!
! /* Skip to the next length field */
! if (vlen > 0 && pqSkipnchar(vlen, conn))
! return EOF;
}
! /* Success! Pass the completed row values to rowProcessor */
! if (!conn->rowProcessor(result, conn->rowProcessorParam, rowbuf))
! goto rowProcessError;
return 0;
! rowProcessError:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
! resetPQExpBuffer(&conn->errorMessage);
!
! /*
! * If error message is passed from addTupleFunc, set it into
! * PGconn, assume out of memory if not.
! */
! appendPQExpBufferStr(&conn->errorMessage,
! result->rowProcessorErrMsg ?
! result->rowProcessorErrMsg :
! libpq_gettext("out of memory for query result\n"));
pqSaveErrorResult(conn);
/* Discard the failed message by pretending we read it */
*** a/src/interfaces/libpq/libpq-fe.h
--- b/src/interfaces/libpq/libpq-fe.h
***************
*** 149,154 **** typedef struct pgNotify
--- 149,165 ----
struct pgNotify *next; /* list link */
} PGnotify;
+ /* PGrowValue points a column value of in network buffer.
+ * Value is a string without null termination and length len.
+ * NULL is represented as len < 0, value points then to place
+ * where value would have been.
+ */
+ typedef struct pgRowValue
+ {
+ int len; /* length in bytes of the value */
+ char *value; /* actual value, without null termination */
+ } PGrowValue;
+
/* Function types for notice-handling callbacks */
typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
typedef void (*PQnoticeProcessor) (void *arg, const char *message);
***************
*** 416,421 **** extern PGPing PQping(const char *conninfo);
--- 427,463 ----
extern PGPing PQpingParams(const char *const * keywords,
const char *const * values, int expand_dbname);
+ /*
+ * Typedef for alternative row processor.
+ *
+ * Columns array will contain PQnfields() entries, each one
+ * pointing to particular column data in network buffer.
+ * This function is supposed to copy data out from there
+ * and store somewhere. NULL is signified with len<0.
+ *
+ * This function must return 1 for success and must return 0 for
+ * failure and may set error message by PQsetRowProcessorErrMsg. It
+ * is assumed by caller as out of memory when the error message is not
+ * set on failure. This function is assumed not to throw any
+ * exception.
+ */
+ typedef int (*PQrowProcessor)(PGresult *res, void *param,
+ PGrowValue *columns);
+
+ /*
+ * Set alternative row data processor for PGconn.
+ *
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ *
+ * rowProcessorParam is the contextual variable that passed to
+ * RowProcessor.
+ */
+ extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func,
+ void *rowProcessorParam);
+
/* Force the write buffer to be written (or at least try) */
extern int PQflush(PGconn *conn);
***************
*** 454,459 **** extern char *PQcmdTuples(PGresult *res);
--- 496,502 ----
extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
extern int PQgetlength(const PGresult *res, int tup_num, int field_num);
extern int PQgetisnull(const PGresult *res, int tup_num, int field_num);
+ extern void PQsetRowProcessorErrMsg(PGresult *res, char *msg);
extern int PQnparams(const PGresult *res);
extern Oid PQparamtype(const PGresult *res, int param_num);
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
***************
*** 209,214 **** struct pg_result
--- 209,217 ----
PGresult_data *curBlock; /* most recently allocated block */
int curOffset; /* start offset of free space in block */
int spaceLeft; /* number of free bytes remaining in block */
+
+ /* temp etorage for message from row processor callback */
+ char *rowProcessorErrMsg;
};
/* PGAsyncStatusType defines the state of the query-execution state machine */
***************
*** 398,404 **** struct pg_conn
/* Status for asynchronous result construction */
PGresult *result; /* result being constructed */
- PGresAttValue *curTuple; /* tuple currently being read */
#ifdef USE_SSL
bool allow_ssl_try; /* Allowed to try SSL negotiation */
--- 401,406 ----
***************
*** 443,448 **** struct pg_conn
--- 445,458 ----
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
+
+ /*
+ * Read column data from network buffer.
+ */
+ PQrowProcessor rowProcessor;/* Function pointer */
+ void *rowProcessorParam; /* Contextual parameter for rowProcessor */
+ PGrowValue *rowBuf; /* Buffer for passing values to rowProcessor */
+ int rowBufLen; /* Number of columns allocated in rowBuf */
};
/* PGcancel stores all data necessary to cancel a connection. A copy of this
***************
*** 560,565 **** extern int pqGets(PQExpBuffer buf, PGconn *conn);
--- 570,576 ----
extern int pqGets_append(PQExpBuffer buf, PGconn *conn);
extern int pqPuts(const char *s, PGconn *conn);
extern int pqGetnchar(char *s, size_t len, PGconn *conn);
+ extern int pqSkipnchar(size_t len, PGconn *conn);
extern int pqPutnchar(const char *s, size_t len, PGconn *conn);
extern int pqGetInt(int *result, size_t bytes, PGconn *conn);
extern int pqPutInt(int value, size_t bytes, PGconn *conn);
*** a/doc/src/sgml/libpq.sgml
--- b/doc/src/sgml/libpq.sgml
***************
*** 7233,7238 **** int PQisthreadsafe();
--- 7233,7443 ----
</sect1>
+ <sect1 id="libpq-altrowprocessor">
+ <title>Alternative row processor</title>
+
+ <indexterm zone="libpq-altrowprocessor">
+ <primary>PGresult</primary>
+ <secondary>PGconn</secondary>
+ </indexterm>
+
+ <para>
+ As the standard usage, rows are stored into <type>PQresult</type>
+ until full resultset is received. Then such completely-filled
+ <type>PQresult</type> is passed to user. This behaviour can be
+ changed by registering alternative row processor function,
+ that will see each row data as soon as it is received
+ from network. It has the option of processing the data
+ immediately, or storing it into custom container.
+ </para>
+
+ <para>
+ Note - as row processor sees rows as they arrive, it cannot know
+ whether the SQL statement actually finishes successfully on server
+ or not. So some care must be taken to get proper
+ transactionality.
+ </para>
+
+ <variablelist>
+ <varlistentry id="libpq-pqsetrowprocessor">
+ <term>
+ <function>PQsetRowProcessor</function>
+ <indexterm>
+ <primary>PQsetRowProcessor</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Sets a callback function to process each row.
+ <synopsis>
+ void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+ </synopsis>
+ </para>
+
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>conn</parameter></term>
+ <listitem>
+ <para>
+ The connection object to set the row processor function.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>func</parameter></term>
+ <listitem>
+ <para>
+ Storage handler function to set. NULL means to use the
+ default processor.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>param</parameter></term>
+ <listitem>
+ <para>
+ A pointer to contextual parameter passed
+ to <parameter>func</parameter>.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqrowprocessor">
+ <term>
+ <type>PQrowProcessor</type>
+ <indexterm>
+ <primary>PQrowProcessor</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ The type for the row processor callback function.
+ <synopsis>
+ int (*PQrowProcessor)(PGresult *res, void *param, PGrowValue *columns);
+
+ typedef struct
+ {
+ int len; /* length in bytes of the value */
+ char *value; /* actual value, without null termination */
+ } PGrowValue;
+ </synopsis>
+ </para>
+
+ <para>
+ The <parameter>columns</parameter> array will have PQnfields()
+ elements, each one pointing to column value in network buffer.
+ </para>
+
+ <para>
+ This function must process or copy row values away from network
+ buffer before it returns, as next row might overwrite them.
+ </para>
+
+ <para>
+ This function must return 1 for success, and 0 for failure.
+ On failure this function should set the error message
+ with <function>PGsetRowProcessorErrMsg</function> if the cause
+ is other than out of memory. This funcion must not throw any
+ exception.
+ </para>
+ <variablelist>
+ <varlistentry>
+
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+
+ <term><parameter>param</parameter></term>
+ <listitem>
+ <para>
+ Extra parameter that was given to <function>PQsetRowProcessor</function>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+
+ <term><parameter>columns</parameter></term>
+ <listitem>
+ <para>
+ Column values of the row to process. Column values
+ are located in network buffer, the processor must
+ copy them out from there.
+ </para>
+ <para>
+ Column values are not null-terminated, so processor cannot
+ use C string functions on them directly.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqsetrowprocessorerrmsg">
+ <term>
+ <function>PQsetRowProcessorErrMsg</function>
+ <indexterm>
+ <primary>PQsetRowProcessorErrMsg</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Set the message for the error occurred
+ in <type>PQrowProcessor</type>. If this message is not set, the
+ caller assumes the error to be out of memory.
+ <synopsis>
+ void PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+ </synopsis>
+ </para>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object
+ passed to <type>PQrowProcessor</type>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>mes</parameter></term>
+ <listitem>
+ <para>
+ Error message. This will be copied internally so there is
+ no need to care of the scope.
+ </para>
+ <para>
+ If <parameter>res</parameter> already has a message previously
+ set, it will be overritten. Set NULL to cancel the the costom
+ message.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </sect1>
+
+
<sect1 id="libpq-build">
<title>Building <application>libpq</application> Programs</title>
*** a/contrib/dblink/dblink.c
--- b/contrib/dblink/dblink.c
***************
*** 63,73 **** typedef struct remoteConn
bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
/*
* Internal declarations
*/
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
- static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
--- 63,85 ----
bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
+ typedef struct storeInfo
+ {
+ Tuplestorestate *tuplestore;
+ int nattrs;
+ MemoryContext oldcontext;
+ AttInMetadata *attinmeta;
+ char** valbuf;
+ int *valbuflen;
+ bool error_occurred;
+ bool nummismatch;
+ ErrorData *edata;
+ } storeInfo;
+
/*
* Internal declarations
*/
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
***************
*** 90,95 **** static char *escape_param_str(const char *from);
--- 102,111 ----
static void validate_pkattnums(Relation rel,
int2vector *pkattnums_arg, int32 pknumatts_arg,
int **pkattnums, int *pknumatts);
+ static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+ static void finishStoreInfo(storeInfo *sinfo);
+ static int storeHandler(PGresult *res, void *param, PGrowValue *columns);
+
/* Global */
static remoteConn *pconn = NULL;
***************
*** 503,508 **** dblink_fetch(PG_FUNCTION_ARGS)
--- 519,525 ----
char *curname = NULL;
int howmany = 0;
bool fail = true; /* default to backward compatible */
+ storeInfo storeinfo;
DBLINK_INIT;
***************
*** 559,573 **** dblink_fetch(PG_FUNCTION_ARGS)
--- 576,611 ----
appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+ /*
* Try to execute the query. Note that since libpq uses malloc, the
* PGresult will be long-lived even though we are still in a short-lived
* memory context.
*/
res = PQexec(conn, buf.data);
+ finishStoreInfo(&storeinfo);
+
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
+ /* finishStoreInfo saves the fields referred to below. */
+ if (storeinfo.nummismatch)
+ {
+ /* This is only for backward compatibility */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
+ else if (storeinfo.edata)
+ ReThrowError(storeinfo.edata);
+
dblink_res_error(conname, res, "could not fetch from cursor", fail);
return (Datum) 0;
}
***************
*** 579,586 **** dblink_fetch(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));
}
- materializeResult(fcinfo, res);
return (Datum) 0;
}
--- 617,624 ----
(errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));
}
+ PQclear(res);
return (Datum) 0;
}
***************
*** 640,645 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
--- 678,684 ----
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible */
bool freeconn = false;
+ storeInfo storeinfo;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
***************
*** 715,878 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
rsinfo->setResult = NULL;
rsinfo->setDesc = NULL;
/* synchronous query, or async result retrieval */
if (!is_async)
res = PQexec(conn, sql);
else
- {
res = PQgetResult(conn);
- /* NULL means we're all done with the async results */
- if (!res)
- return (Datum) 0;
- }
! /* if needed, close the connection to the database and cleanup */
! if (freeconn)
! PQfinish(conn);
! if (!res ||
! (PQresultStatus(res) != PGRES_COMMAND_OK &&
! PQresultStatus(res) != PGRES_TUPLES_OK))
{
! dblink_res_error(conname, res, "could not execute query", fail);
! return (Datum) 0;
}
- materializeResult(fcinfo, res);
return (Datum) 0;
}
- /*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */
static void
! materializeResult(FunctionCallInfo fcinfo, PGresult *res)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
! Assert(rsinfo->returnMode == SFRM_Materialize);
! PG_TRY();
{
! TupleDesc tupdesc;
! bool is_sql_cmd = false;
! int ntuples;
! int nfields;
! if (PQresultStatus(res) == PGRES_COMMAND_OK)
! {
! is_sql_cmd = true;
! /*
! * need a tuple descriptor representing one TEXT column to return
! * the command status string as our result tuple
! */
! tupdesc = CreateTemplateTupleDesc(1, false);
! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
! TEXTOID, -1, 0);
! ntuples = 1;
! nfields = 1;
! }
! else
! {
! Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
! is_sql_cmd = false;
! /* get a tuple descriptor for our result type */
! switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! {
! case TYPEFUNC_COMPOSITE:
! /* success */
! break;
! case TYPEFUNC_RECORD:
! /* failed to determine actual type of RECORD */
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("function returning record called in context "
! "that cannot accept type record")));
! break;
! default:
! /* result type isn't composite */
! elog(ERROR, "return type must be a row type");
! break;
! }
! /* make sure we have a persistent copy of the tupdesc */
! tupdesc = CreateTupleDescCopy(tupdesc);
! ntuples = PQntuples(res);
! nfields = PQnfields(res);
}
! /*
! * check result and tuple descriptor have the same number of columns
! */
! if (nfields != tupdesc->natts)
! ereport(ERROR,
! (errcode(ERRCODE_DATATYPE_MISMATCH),
! errmsg("remote query result rowtype does not match "
! "the specified FROM clause rowtype")));
! if (ntuples > 0)
! {
! AttInMetadata *attinmeta;
! Tuplestorestate *tupstore;
! MemoryContext oldcontext;
! int row;
! char **values;
!
! attinmeta = TupleDescGetAttInMetadata(tupdesc);
!
! oldcontext = MemoryContextSwitchTo(
! rsinfo->econtext->ecxt_per_query_memory);
! tupstore = tuplestore_begin_heap(true, false, work_mem);
! rsinfo->setResult = tupstore;
! rsinfo->setDesc = tupdesc;
! MemoryContextSwitchTo(oldcontext);
! values = (char **) palloc(nfields * sizeof(char *));
! /* put all tuples into the tuplestore */
! for (row = 0; row < ntuples; row++)
! {
! HeapTuple tuple;
! if (!is_sql_cmd)
! {
! int i;
! for (i = 0; i < nfields; i++)
! {
! if (PQgetisnull(res, row, i))
! values[i] = NULL;
! else
! values[i] = PQgetvalue(res, row, i);
! }
! }
else
! {
! values[0] = PQcmdStatus(res);
! }
!
! /* build the tuple and put it into the tuplestore. */
! tuple = BuildTupleFromCStrings(attinmeta, values);
! tuplestore_puttuple(tupstore, tuple);
}
! /* clean up and return the tuplestore */
! tuplestore_donestoring(tupstore);
}
! PQclear(res);
}
PG_CATCH();
{
! /* be sure to release the libpq result */
! PQclear(res);
! PG_RE_THROW();
}
PG_END_TRY();
}
/*
--- 754,993 ----
rsinfo->setResult = NULL;
rsinfo->setDesc = NULL;
+
+ /*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec/PQgetResult below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
/* synchronous query, or async result retrieval */
if (!is_async)
res = PQexec(conn, sql);
else
res = PQgetResult(conn);
! finishStoreInfo(&storeinfo);
! /* NULL res from async get means we're all done with the results */
! if (res || !is_async)
{
! if (freeconn)
! PQfinish(conn);
!
! if (!res ||
! (PQresultStatus(res) != PGRES_COMMAND_OK &&
! PQresultStatus(res) != PGRES_TUPLES_OK))
! {
! /* finishStoreInfo saves the fields referred to below. */
! if (storeinfo.nummismatch)
! {
! /* This is only for backward compatibility */
! ereport(ERROR,
! (errcode(ERRCODE_DATATYPE_MISMATCH),
! errmsg("remote query result rowtype does not match "
! "the specified FROM clause rowtype")));
! }
! else if (storeinfo.edata)
! ReThrowError(storeinfo.edata);
!
! dblink_res_error(conname, res, "could not execute query", fail);
! return (Datum) 0;
! }
}
+ PQclear(res);
return (Datum) 0;
}
static void
! initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ int i;
! switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! {
! case TYPEFUNC_COMPOSITE:
! /* success */
! break;
! case TYPEFUNC_RECORD:
! /* failed to determine actual type of RECORD */
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("function returning record called in context "
! "that cannot accept type record")));
! break;
! default:
! /* result type isn't composite */
! elog(ERROR, "return type must be a row type");
! break;
! }
! sinfo->oldcontext = MemoryContextSwitchTo(
! rsinfo->econtext->ecxt_per_query_memory);
!
! /* make sure we have a persistent copy of the tupdesc */
! tupdesc = CreateTupleDescCopy(tupdesc);
!
! sinfo->error_occurred = FALSE;
! sinfo->nummismatch = FALSE;
! sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
! sinfo->edata = NULL;
! sinfo->nattrs = tupdesc->natts;
! sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
! sinfo->valbuf = NULL;
! sinfo->valbuflen = NULL;
!
! /* Preallocate memory of same size with c string array for values. */
! sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
! if (sinfo->valbuf)
! sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
! if (sinfo->valbuflen == NULL)
{
! if (sinfo->valbuf)
! free(sinfo->valbuf);
! ereport(ERROR,
! (errcode(ERRCODE_OUT_OF_MEMORY),
! errmsg("out of memory")));
! }
! for (i = 0 ; i < sinfo->nattrs ; i++)
! {
! sinfo->valbuf[i] = NULL;
! sinfo->valbuflen[i] = -1;
! }
! rsinfo->setResult = sinfo->tuplestore;
! rsinfo->setDesc = tupdesc;
! }
! static void
! finishStoreInfo(storeInfo *sinfo)
! {
! int i;
! if (sinfo->valbuf)
! {
! for (i = 0 ; i < sinfo->nattrs ; i++)
! {
! if (sinfo->valbuf[i])
! free(sinfo->valbuf[i]);
}
+ free(sinfo->valbuf);
+ sinfo->valbuf = NULL;
+ }
! if (sinfo->valbuflen)
! {
! free(sinfo->valbuflen);
! sinfo->valbuflen = NULL;
! }
! MemoryContextSwitchTo(sinfo->oldcontext);
! }
! static int
! storeHandler(PGresult *res, void *param, PGrowValue *columns)
! {
! storeInfo *sinfo = (storeInfo *)param;
! HeapTuple tuple;
! int fields = PQnfields(res);
! int i;
! char *cstrs[PQnfields(res)];
! if (sinfo->error_occurred)
! return FALSE;
! if (sinfo->nattrs != fields)
! {
! sinfo->error_occurred = TRUE;
! sinfo->nummismatch = TRUE;
! finishStoreInfo(sinfo);
!
! /* This error will be processed in
! * dblink_record_internal(). So do not set error message
! * here. */
! return FALSE;
! }
! /*
! * value input functions assumes that the input string is
! * terminated by zero. We should make the values to be so.
! */
! for(i = 0 ; i < fields ; i++)
! {
! int len = columns[i].len;
! if (len < 0)
! cstrs[i] = NULL;
! else
! {
! char *tmp = sinfo->valbuf[i];
! int tmplen = sinfo->valbuflen[i];
! /*
! * Divide calls to malloc and realloc so that things will
! * go fine even on the systems of which realloc() does not
! * accept NULL as old memory block.
! *
! * Also try to (re)allocate in bigger steps to
! * avoid flood of allocations on weird data.
! */
! if (tmp == NULL)
! {
! tmplen = len + 1;
! if (tmplen < 64)
! tmplen = 64;
! tmp = (char *)malloc(tmplen);
! }
! else if (tmplen < len + 1)
! {
! if (len + 1 > tmplen * 2)
! tmplen = len + 1;
else
! tmplen = tmplen * 2;
! tmp = (char *)realloc(tmp, tmplen);
}
! /*
! * sinfo->valbuf[n] will be freed in finishStoreInfo()
! * when realloc returns NULL.
! */
! if (tmp == NULL)
! return FALSE;
!
! sinfo->valbuf[i] = tmp;
! sinfo->valbuflen[i] = tmplen;
!
! cstrs[i] = sinfo->valbuf[i];
! memcpy(cstrs[i], columns[i].value, len);
! cstrs[i][len] = '\0';
}
+ }
! PG_TRY();
! {
! tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
! tuplestore_puttuple(sinfo->tuplestore, tuple);
}
PG_CATCH();
{
! MemoryContext context;
! /*
! * Store exception for later ReThrow and cancel the exception.
! */
! sinfo->error_occurred = TRUE;
! context = MemoryContextSwitchTo(sinfo->oldcontext);
! sinfo->edata = CopyErrorData();
! MemoryContextSwitchTo(context);
! FlushErrorState();
! return FALSE;
}
PG_END_TRY();
+
+ return TRUE;
}
/*
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers