Hello, this is new version of the patch.
> > By the way, I would like to ask you one question. What is the
> > reason why void* should be head or tail of the parameter list?
>
> Aesthetical reasons:
I got it. Thank you.
> Last comment - if we drop the plan to make PQsetRowProcessorErrMsg()
> usable outside of handler, we can simplify internal usage as well:
> the PGresult->rowProcessorErrMsg can be dropped and let's use
> ->errMsg to transport the error message.
>
> The PGresult is long-lived structure and adding fields for such
> temporary usage feels wrong. There is no other libpq code between
> row processor and getAnotherTuple, so the use is safe.
I almost agree with it. Plus, I think it is no reason to consider
out of memory as particular because now row processor becomes
generic. But the previous patch does different process for OOM
and others, but I couldn't see obvious reason to do so.
- PGresult.rowProcessorErrMes is removed and use PGconn.errMsg
instead with the new interface function PQresultSetErrMes().
- Now row processors should set message for any error status
occurred within. pqAddRow and dblink.c is modified to do so.
- getAnotherTuple() sets the error message `unknown error' for
the condition rp == 0 && ->errMsg == NULL.
- Always forward input cursor and do pqClearAsyncResult() and
pqSaveErrorResult() when rp == 0 in getAnotherTuple()
regardless whether ->errMsg is NULL or not in fe-protocol3.c.
- conn->inStart is already moved to the start point of the next
message when row processor is called. So forwarding inStart in
outOfMemory1 seems redundant. I removed it.
- printfPQExpBuffer() compains for variable message. So use
resetPQExpBuffer() and appendPQExpBufferStr() instead.
=====
- dblink does not use conn->errorMessage before, and also now...
all errors are displayed as `Error occurred on dblink connection...'.
- TODO: No NLS messages for error messages.
- Somehow make check yields error for base revision. So I have
not done that.
- I have no idea how to do test for protocol 2...
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..239edb8 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,7 @@ PQconnectStartParams 157
PQping 158
PQpingParams 159
PQlibVersion 160
+PQsetRowProcessor 161
+PQgetRowProcessor 162
+PQresultSetErrMsg 163
+PQskipResult 164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 27a9805..4605e49 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2693,6 +2693,9 @@ makeEmptyPGconn(void)
conn->wait_ssl_try = false;
#endif
+ /* set default row processor */
+ PQsetRowProcessor(conn, NULL, NULL);
+
/*
* We try to send at least 8K at a time, which is the usual size of pipe
* buffers on Unix systems. That way, when we are sending a large amount
@@ -2711,8 +2714,13 @@ makeEmptyPGconn(void)
initPQExpBuffer(&conn->errorMessage);
initPQExpBuffer(&conn->workBuffer);
+ /* set up initial row buffer */
+ conn->rowBufLen = 32;
+ conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue));
+
if (conn->inBuffer == NULL ||
conn->outBuffer == NULL ||
+ conn->rowBuf == NULL ||
PQExpBufferBroken(&conn->errorMessage) ||
PQExpBufferBroken(&conn->workBuffer))
{
@@ -2814,6 +2822,8 @@ freePGconn(PGconn *conn)
free(conn->inBuffer);
if (conn->outBuffer)
free(conn->outBuffer);
+ if (conn->rowBuf)
+ free(conn->rowBuf);
termPQExpBuffer(&conn->errorMessage);
termPQExpBuffer(&conn->workBuffer);
@@ -5078,3 +5088,4 @@ PQregisterThreadLock(pgthreadlock_t newhandler)
return prev;
}
+
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..7fd3c9c 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
+static int pqAddRow(PGresult *res, PGrowValue *columns, void *param);
/* ----------------
@@ -701,7 +702,6 @@ pqClearAsyncResult(PGconn *conn)
if (conn->result)
PQclear(conn->result);
conn->result = NULL;
- conn->curTuple = NULL;
}
/*
@@ -756,7 +756,6 @@ pqPrepareAsyncResult(PGconn *conn)
*/
res = conn->result;
conn->result = NULL; /* handing over ownership to caller */
- conn->curTuple = NULL; /* just in case */
if (!res)
res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
else
@@ -828,6 +827,93 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
}
/*
+ * PQsetRowProcessor
+ * Set function that copies column data out from network buffer.
+ */
+void
+PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+{
+ conn->rowProcessor = (func ? func : pqAddRow);
+ conn->rowProcessorParam = param;
+}
+
+/*
+ * PQgetRowProcessor
+ * Get current row processor of conn. set pointer to current parameter for
+ * row processor to param if not NULL.
+ */
+PQrowProcessor
+PQgetRowProcessor(PGconn *conn, void **param)
+{
+ if (param)
+ *param = conn->rowProcessorParam;
+
+ return conn->rowProcessor;
+}
+
+/*
+ * PQresultSetErrMsg
+ * Set the error message to PGresult.
+ *
+ * You can replace the previous message by alternative mes, or clear
+ * it with NULL.
+ */
+void
+PQresultSetErrMsg(PGresult *res, const char *msg)
+{
+ if (msg)
+ res->errMsg = pqResultStrdup(res, msg);
+ else
+ res->errMsg = NULL;
+}
+
+/*
+ * pqAddRow
+ * add a row to the PGresult structure, growing it if necessary
+ * Returns TRUE if OK, FALSE if not enough memory to add the row.
+ */
+static int
+pqAddRow(PGresult *res, PGrowValue *columns, void *param)
+{
+ PGresAttValue *tup;
+ int nfields = res->numAttributes;
+ int i;
+
+ tup = (PGresAttValue *)
+ pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+ if (tup == NULL)
+ {
+ PQresultSetErrMsg(res, "out of memory for query result\n");
+ return FALSE;
+ }
+
+ for (i = 0 ; i < nfields ; i++)
+ {
+ tup[i].len = columns[i].len;
+ if (tup[i].len == NULL_LEN)
+ {
+ tup[i].value = res->null_field;
+ }
+ else
+ {
+ bool isbinary = (res->attDescs[i].format != 0);
+ tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+ if (tup[i].value == NULL)
+ {
+ PQresultSetErrMsg(res, "out of memory for query result\n");
+ return FALSE;
+ }
+
+ memcpy(tup[i].value, columns[i].value, tup[i].len);
+ /* We have to terminate this ourselves */
+ tup[i].value[tup[i].len] = '\0';
+ }
+ }
+
+ return pqAddTuple(res, tup);
+}
+
+/*
* pqAddTuple
* add a row pointer to the PGresult structure, growing it if necessary
* Returns TRUE if OK, FALSE if not enough memory to add the row
@@ -1223,7 +1309,6 @@ PQsendQueryStart(PGconn *conn)
/* initialize async result-accumulation state */
conn->result = NULL;
- conn->curTuple = NULL;
/* ready to send command message */
return true;
@@ -1831,6 +1916,55 @@ PQexecFinish(PGconn *conn)
return lastResult;
}
+
+/*
+ * Do-nothing row processor for PQskipResult
+ */
+static int
+dummyRowProcessor(PGresult *res, PGrowValue *columns, void *param)
+{
+ return 1;
+}
+
+/*
+ * Exaust remaining Data Rows in curret conn.
+ *
+ * Exaust current result if skipAll is false and all succeeding results if
+ * true.
+ */
+int
+PQskipResult(PGconn *conn, int skipAll)
+{
+ PQrowProcessor savedRowProcessor;
+ void * savedRowProcParam;
+ PGresult *res;
+ int ret = 0;
+
+ /* save the current row processor settings and set dummy processor */
+ savedRowProcessor = PQgetRowProcessor(conn, &savedRowProcParam);
+ PQsetRowProcessor(conn, dummyRowProcessor, NULL);
+
+ /*
+ * Throw away the remaining rows in current result, or all succeeding
+ * results if skipAll is not FALSE.
+ */
+ if (skipAll)
+ {
+ while ((res = PQgetResult(conn)) != NULL)
+ PQclear(res);
+ }
+ else if ((res = PQgetResult(conn)) != NULL)
+ {
+ PQclear(res);
+ ret = 1;
+ }
+
+ PQsetRowProcessor(conn, savedRowProcessor, savedRowProcParam);
+
+ return ret;
+}
+
+
/*
* PQdescribePrepared
* Obtain information about a previously prepared statement
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3..d11cb3c 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)
}
/*
+ * pqGetnchar:
+ * skip len bytes in input buffer.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+ if (len > (size_t) (conn->inEnd - conn->inCursor))
+ return EOF;
+
+ conn->inCursor += len;
+
+ if (conn->Pfdebug)
+ fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+ (unsigned long) len);
+
+ return 0;
+}
+
+/*
* pqPutnchar:
* write exactly len bytes to the current message
*/
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..3b0520d 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -569,6 +569,8 @@ pqParseInput2(PGconn *conn)
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, FALSE))
return;
+ /* getAnotherTuple moves inStart itself */
+ continue;
}
else
{
@@ -585,6 +587,8 @@ pqParseInput2(PGconn *conn)
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, TRUE))
return;
+ /* getAnotherTuple moves inStart itself */
+ continue;
}
else
{
@@ -703,52 +707,55 @@ failure:
/*
* parseInput subroutine to read a 'B' or 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
+ * It fills rowbuf with column pointers and then calls row processor.
* Returns: 0 if completed message, EOF if error or not enough data yet.
*
* Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received. We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received.
*/
static int
getAnotherTuple(PGconn *conn, bool binary)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
- PGresAttValue *tup;
+ PGrowValue *rowbuf;
/* the backend sends us a bitmap of which attributes are null */
char std_bitmap[64]; /* used unless it doesn't fit */
char *bitmap = std_bitmap;
int i;
+ int rp;
size_t nbytes; /* the number of bytes in bitmap */
char bmap; /* One byte of the bitmap */
int bitmap_index; /* Its index */
int bitcnt; /* number of bits examined in current byte */
int vlen; /* length of the current field value */
+ char *errmsg = libpq_gettext("unknown error\n");
- result->binary = binary;
-
- /* Allocate tuple space if first time for this data message */
- if (conn->curTuple == NULL)
+ /* resize row buffer if needed */
+ if (nfields > conn->rowBufLen)
{
- conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- if (conn->curTuple == NULL)
- goto outOfMemory;
- MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
- /*
- * If it's binary, fix the column format indicators. We assume the
- * backend will consistently send either B or D, not a mix.
- */
- if (binary)
+ rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ if (!rowbuf)
{
- for (i = 0; i < nfields; i++)
- result->attDescs[i].format = 1;
+ errmsg = libpq_gettext("out of memory for query result\n");
+ goto error;
}
+ conn->rowBuf = rowbuf;
+ conn->rowBufLen = nfields;
+ }
+ else
+ {
+ rowbuf = conn->rowBuf;
+ }
+
+ result->binary = binary;
+
+ if (binary)
+ {
+ for (i = 0; i < nfields; i++)
+ result->attDescs[i].format = 1;
}
- tup = conn->curTuple;
/* Get the null-value bitmap */
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
@@ -757,7 +764,10 @@ getAnotherTuple(PGconn *conn, bool binary)
{
bitmap = (char *) malloc(nbytes);
if (!bitmap)
- goto outOfMemory;
+ {
+ errmsg = libpq_gettext("out of memory for query result\n");
+ goto error;
+ }
}
if (pqGetnchar(bitmap, nbytes, conn))
@@ -771,34 +781,29 @@ getAnotherTuple(PGconn *conn, bool binary)
for (i = 0; i < nfields; i++)
{
if (!(bmap & 0200))
- {
- /* if the field value is absent, make it a null string */
- tup[i].value = result->null_field;
- tup[i].len = NULL_LEN;
- }
+ vlen = NULL_LEN;
+ else if (pqGetInt(&vlen, 4, conn))
+ goto EOFexit;
else
{
- /* get the value length (the first four bytes are for length) */
- if (pqGetInt(&vlen, 4, conn))
- goto EOFexit;
if (!binary)
vlen = vlen - 4;
if (vlen < 0)
vlen = 0;
- if (tup[i].value == NULL)
- {
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
- if (tup[i].value == NULL)
- goto outOfMemory;
- }
- tup[i].len = vlen;
- /* read in the value */
- if (vlen > 0)
- if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- goto EOFexit;
- /* we have to terminate this ourselves */
- tup[i].value[vlen] = '\0';
}
+
+ /*
+ * rowbuf[i].value always points to the next address of the
+ * length field even if the value is NULL, to allow safe
+ * size estimates and data copy.
+ */
+ rowbuf[i].value = conn->inBuffer + conn->inCursor;
+ rowbuf[i].len = vlen;
+
+ /* Skip the value */
+ if (vlen > 0 && pqSkipnchar(vlen, conn))
+ goto EOFexit;
+
/* advance the bitmap stuff */
bitcnt++;
if (bitcnt == BITS_PER_BYTE)
@@ -811,33 +816,53 @@ getAnotherTuple(PGconn *conn, bool binary)
bmap <<= 1;
}
- /* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
- /* and reset for a new message */
- conn->curTuple = NULL;
-
if (bitmap != std_bitmap)
free(bitmap);
- return 0;
+ bitmap = NULL;
-outOfMemory:
- /* Replace partially constructed result with an error result */
+ /* tag the row as parsed */
+ conn->inStart = conn->inCursor;
+ /* Pass the completed row values to rowProcessor */
+ rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
+ if (rp == 1)
+ return 0;
+ else if (rp == 2 && pqIsnonblocking(conn))
+ /* processor requested early exit */
+ return EOF;
+ else if (rp == 0)
+ {
+ errmsg = result->errMsg;
+ result->errMsg = NULL;
+ if (errmsg == NULL)
+ errmsg = libpq_gettext("unknown error in row processor\n");
+ goto error;
+ }
+
+ errmsg = libpq_gettext("invalid return value from row processor\n");
+
+error:
/*
* we do NOT use pqSaveErrorResult() here, because of the likelihood that
* there's not enough memory to concatenate messages...
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
-
+ resetPQExpBuffer(&conn->errorMessage);
+
+ /*
+ * If error message is passed from RowProcessor, set it into
+ * PGconn, assume out of memory if not.
+ */
+ appendPQExpBufferStr(&conn->errorMessage, errmsg);
+
/*
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
* do to recover...
*/
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
+
conn->asyncStatus = PGASYNC_READY;
+
/* Discard the failed message --- good idea? */
conn->inStart = conn->inEnd;
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..c8202c2 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -327,6 +327,9 @@ pqParseInput3(PGconn *conn)
/* Read another tuple of a normal query response */
if (getAnotherTuple(conn, msgLength))
return;
+
+ /* getAnotherTuple() moves inStart itself */
+ continue;
}
else if (conn->result != NULL &&
conn->result->resultStatus == PGRES_FATAL_ERROR)
@@ -613,33 +616,23 @@ failure:
/*
* parseInput subroutine to read a 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
+ * It fills rowbuf with column pointers and then calls row processor.
* Returns: 0 if completed message, EOF if error or not enough data yet.
*
* Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received. We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received.
*/
static int
getAnotherTuple(PGconn *conn, int msgLength)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
- PGresAttValue *tup;
+ PGrowValue *rowbuf;
int tupnfields; /* # fields from tuple */
int vlen; /* length of the current field value */
int i;
-
- /* Allocate tuple space if first time for this data message */
- if (conn->curTuple == NULL)
- {
- conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- if (conn->curTuple == NULL)
- goto outOfMemory;
- MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
- }
- tup = conn->curTuple;
+ int rp;
+ char *errmsg = libpq_gettext("unknown error\n");
/* Get the field count and make sure it's what we expect */
if (pqGetInt(&tupnfields, 2, conn))
@@ -647,13 +640,22 @@ getAnotherTuple(PGconn *conn, int msgLength)
if (tupnfields != nfields)
{
- /* Replace partially constructed result with an error result */
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("unexpected field count in \"D\" message\n"));
- pqSaveErrorResult(conn);
- /* Discard the failed message by pretending we read it */
- conn->inCursor = conn->inStart + 5 + msgLength;
- return 0;
+ errmsg = libpq_gettext("unexpected field count in \"D\" message\n");
+ goto error_and_forward;
+ }
+
+ /* resize row buffer if needed */
+ rowbuf = conn->rowBuf;
+ if (nfields > conn->rowBufLen)
+ {
+ rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ if (!rowbuf)
+ {
+ errmsg = libpq_gettext("out of memory for query result\n");
+ goto error_and_forward;
+ }
+ conn->rowBuf = rowbuf;
+ conn->rowBufLen = nfields;
}
/* Scan the fields */
@@ -661,54 +663,78 @@ getAnotherTuple(PGconn *conn, int msgLength)
{
/* get the value length */
if (pqGetInt(&vlen, 4, conn))
- return EOF;
- if (vlen == -1)
{
- /* null field */
- tup[i].value = result->null_field;
- tup[i].len = NULL_LEN;
- continue;
+ /*
+ * Forwarding inCursor does not make no sense when protocol error
+ */
+ errmsg = libpq_gettext("invalid row contents\n");
+ goto error;
}
- if (vlen < 0)
+ if (vlen == -1)
+ vlen = NULL_LEN;
+ else if (vlen < 0)
vlen = 0;
- if (tup[i].value == NULL)
- {
- bool isbinary = (result->attDescs[i].format != 0);
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
- if (tup[i].value == NULL)
- goto outOfMemory;
- }
- tup[i].len = vlen;
- /* read in the value */
- if (vlen > 0)
- if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- return EOF;
- /* we have to terminate this ourselves */
- tup[i].value[vlen] = '\0';
+ /*
+ * rowbuf[i].value always points to the next address of the
+ * length field even if the value is NULL, to allow safe
+ * size estimates and data copy.
+ */
+ rowbuf[i].value = conn->inBuffer + conn->inCursor;
+ rowbuf[i].len = vlen;
+
+ /* Skip to the next length field */
+ if (vlen > 0 && pqSkipnchar(vlen, conn))
+ return EOF;
}
- /* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
- /* and reset for a new message */
- conn->curTuple = NULL;
+ /* tag the row as parsed, check if correctly */
+ conn->inStart += 5 + msgLength;
+ if (conn->inCursor != conn->inStart)
+ {
+ errmsg = libpq_gettext("invalid row contents\n");
+ goto error;
+ }
- return 0;
+ /* Pass the completed row values to rowProcessor */
+ rp = conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
+ if (rp == 1)
+ {
+ /* everything is good */
+ return 0;
+ }
+ if (rp == 2 && pqIsnonblocking(conn))
+ {
+ /* processor requested early exit */
+ return EOF;
+ }
-outOfMemory:
+ /* there was some problem */
+ if (rp == 0)
+ {
+ errmsg = result->errMsg;
+ result->errMsg = NULL;
+ if (errmsg == NULL)
+ errmsg = libpq_gettext("unknown error in row processor\n");
+ goto error;
+ }
+
+ errmsg = libpq_gettext("invalid return value from row processor\n");
+ goto error;
+
+error_and_forward:
+ /* Discard the failed message by pretending we read it */
+ conn->inCursor = conn->inStart + 5 + msgLength;
+error:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
+ resetPQExpBuffer(&conn->errorMessage);
+ appendPQExpBufferStr(&conn->errorMessage, errmsg);
pqSaveErrorResult(conn);
-
- /* Discard the failed message by pretending we read it */
- conn->inCursor = conn->inStart + 5 + msgLength;
return 0;
}
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..810b04e 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -149,6 +149,17 @@ typedef struct pgNotify
struct pgNotify *next; /* list link */
} PGnotify;
+/* PGrowValue points a column value of in network buffer.
+ * Value is a string without null termination and length len.
+ * NULL is represented as len < 0, value points then to place
+ * where value would have been.
+ */
+typedef struct pgRowValue
+{
+ int len; /* length in bytes of the value */
+ char *value; /* actual value, without null termination */
+} PGrowValue;
+
/* Function types for notice-handling callbacks */
typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
typedef void (*PQnoticeProcessor) (void *arg, const char *message);
@@ -416,6 +427,38 @@ extern PGPing PQping(const char *conninfo);
extern PGPing PQpingParams(const char *const * keywords,
const char *const * values, int expand_dbname);
+/*
+ * Typedef for alternative row processor.
+ *
+ * Columns array will contain PQnfields() entries, each one
+ * pointing to particular column data in network buffer.
+ * This function is supposed to copy data out from there
+ * and store somewhere. NULL is signified with len<0.
+ *
+ * This function must return 1 for success and must return 0 for
+ * failure and may set error message by PQresultSetErrMsg. It is assumed by
+ * caller as out of memory when the error message is not set on
+ * failure. This function is assumed not to throw any exception.
+ */
+typedef int (*PQrowProcessor)(PGresult *res, PGrowValue *columns,
+ void *param);
+
+/*
+ * Set alternative row data processor for PGconn.
+ *
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ *
+ * rowProcessorParam is the contextual variable that passed to
+ * RowProcessor.
+ */
+extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func,
+ void *rowProcessorParam);
+extern PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param);
+extern int PQskipResult(PGconn *conn, int skipAll);
+
/* Force the write buffer to be written (or at least try) */
extern int PQflush(PGconn *conn);
@@ -454,6 +497,7 @@ extern char *PQcmdTuples(PGresult *res);
extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
extern int PQgetlength(const PGresult *res, int tup_num, int field_num);
extern int PQgetisnull(const PGresult *res, int tup_num, int field_num);
+extern void PQresultSetErrMsg(PGresult *res, const char *msg);
extern int PQnparams(const PGresult *res);
extern Oid PQparamtype(const PGresult *res, int param_num);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 987311e..9cabd20 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -398,7 +398,6 @@ struct pg_conn
/* Status for asynchronous result construction */
PGresult *result; /* result being constructed */
- PGresAttValue *curTuple; /* tuple currently being read */
#ifdef USE_SSL
bool allow_ssl_try; /* Allowed to try SSL negotiation */
@@ -443,6 +442,14 @@ struct pg_conn
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
+
+ /*
+ * Read column data from network buffer.
+ */
+ PQrowProcessor rowProcessor;/* Function pointer */
+ void *rowProcessorParam; /* Contextual parameter for rowProcessor */
+ PGrowValue *rowBuf; /* Buffer for passing values to rowProcessor */
+ int rowBufLen; /* Number of columns allocated in rowBuf */
};
/* PGcancel stores all data necessary to cancel a connection. A copy of this
@@ -560,6 +567,7 @@ extern int pqGets(PQExpBuffer buf, PGconn *conn);
extern int pqGets_append(PQExpBuffer buf, PGconn *conn);
extern int pqPuts(const char *s, PGconn *conn);
extern int pqGetnchar(char *s, size_t len, PGconn *conn);
+extern int pqSkipnchar(size_t len, PGconn *conn);
extern int pqPutnchar(const char *s, size_t len, PGconn *conn);
extern int pqGetInt(int *result, size_t bytes, PGconn *conn);
extern int pqPutInt(int value, size_t bytes, PGconn *conn);
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..1f91f98 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,329 @@ int PQisthreadsafe();
</sect1>
+ <sect1 id="libpq-altrowprocessor">
+ <title>Alternative row processor</title>
+
+ <indexterm zone="libpq-altrowprocessor">
+ <primary>PGresult</primary>
+ <secondary>PGconn</secondary>
+ </indexterm>
+
+ <para>
+ As the standard usage, rows are stored into <type>PQresult</type>
+ until full resultset is received. Then such completely-filled
+ <type>PQresult</type> is passed to user. This behavior can be
+ changed by registering alternative row processor function,
+ that will see each row data as soon as it is received
+ from network. It has the option of processing the data
+ immediately, or storing it into custom container.
+ </para>
+
+ <para>
+ Note - as row processor sees rows as they arrive, it cannot know
+ whether the SQL statement actually finishes successfully on server
+ or not. So some care must be taken to get proper
+ transactionality.
+ </para>
+
+ <variablelist>
+ <varlistentry id="libpq-pqsetrowprocessor">
+ <term>
+ <function>PQsetRowProcessor</function>
+ <indexterm>
+ <primary>PQsetRowProcessor</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Sets a callback function to process each row.
+<synopsis>
+void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+</synopsis>
+ </para>
+
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>conn</parameter></term>
+ <listitem>
+ <para>
+ The connection object to set the row processor function.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>func</parameter></term>
+ <listitem>
+ <para>
+ Storage handler function to set. NULL means to use the
+ default processor.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>param</parameter></term>
+ <listitem>
+ <para>
+ A pointer to contextual parameter passed
+ to <parameter>func</parameter>.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqrowprocessor">
+ <term>
+ <type>PQrowProcessor</type>
+ <indexterm>
+ <primary>PQrowProcessor</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ The type for the row processor callback function.
+<synopsis>
+int (*PQrowProcessor)(PGresult *res, PGrowValue *columns, void *param);
+
+typedef struct
+{
+ int len; /* length in bytes of the value, -1 if NULL */
+ char *value; /* actual value, without null termination */
+} PGrowValue;
+</synopsis>
+ </para>
+
+ <para>
+ The <parameter>columns</parameter> array will have PQnfields()
+ elements, each one pointing to column value in network buffer.
+ The <parameter>len</parameter> field will contain number of
+ bytes in value. If the field value is NULL then
+ <parameter>len</parameter> will be -1 and value will point
+ to position where the value would have been in buffer.
+ This allows estimating row size by pointer arithmetic.
+ </para>
+
+ <para>
+ This function must process or copy row values away from network
+ buffer before it returns, as next row might overwrite them.
+ </para>
+
+ <para>
+ This function must return 1 for success, and 0 for failure. On
+ failure this function should set the error message
+ with <function>PQresultSetErrMsg</function>. When non-blocking
+ API is in use, it can also return 2 for early exit
+ from <function>PQisBusy</function> function. The
+ supplied <parameter>res</parameter>
+ and <parameter>columns</parameter> values will stay valid so
+ row can be processed outside of callback. Caller is
+ responsible for tracking whether
+ the <parameter>PQisBusy</parameter> returned early from
+ callback or for other reasons. Usually this should happen via
+ setting cached values to NULL before
+ calling <function>PQisBusy</function>.
+ </para>
+
+ <para>
+ The function is allowed to exit via exception (setjmp/longjmp).
+ The connection and row are guaranteed to be in valid state.
+ The connection can later be closed via <function>PQfinish</function>.
+ Processing can also be continued without closing the connection,
+ call <function>getResult</function> on syncronous mode,
+ <function>PQisBusy</function> on asynchronous connection. Then
+ processing will continue with new row, previous row that got
+ exception will be skipped. Or you can discard all remaining
+ rows by calling <function>PQskipResult</function> without
+ closing connection.
+ </para>
+
+ <variablelist>
+ <varlistentry>
+
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+
+ <term><parameter>columns</parameter></term>
+ <listitem>
+ <para>
+ Column values of the row to process. Column values
+ are located in network buffer, the processor must
+ copy them out from there.
+ </para>
+ <para>
+ Column values are not null-terminated, so processor cannot
+ use C string functions on them directly.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+
+ <term><parameter>param</parameter></term>
+ <listitem>
+ <para>
+ Extra parameter that was given to <function>PQsetRowProcessor</function>.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqskipresult">
+ <term>
+ <function>PQskipResult</function>
+ <indexterm>
+ <primary>PQskipResult</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Discard all the remaining row data
+ after <function>PQexec</function>
+ or <function>PQgetResult</function> exits by the exception raised
+ in <type>RowProcessor</type> without closing connection.
+<synopsis>
+void PQskipResult(PGconn *conn, int skipAll)
+</synopsis>
+ </para>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>conn</parameter></term>
+ <listitem>
+ <para>
+ The connection object.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><parameter>skipAll</parameter></term>
+ <listitem>
+ <para>
+ Skip remaining rows in current result
+ if <parameter>skipAll</parameter> is false(0). Skip
+ remaining rows in current result and all rows in
+ succeeding results if true(non-zero).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqresultseterrmsg">
+ <term>
+ <function>PQresultSetErrMsg</function>
+ <indexterm>
+ <primary>PQresultSetErrMsg</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Set the message for the error occurred
+ in <type>PQrowProcessor</type>. If this message is not set, the
+ caller assumes the error to be `unknown' error.
+<synopsis>
+void PQresultSetErrMsg(PGresult *res, const char *msg)
+</synopsis>
+ </para>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object
+ passed to <type>PQrowProcessor</type>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>msg</parameter></term>
+ <listitem>
+ <para>
+ Error message. This will be copied internally so there is
+ no need to care of the scope.
+ </para>
+ <para>
+ If <parameter>res</parameter> already has a message previously
+ set, it will be overwritten. Set NULL to cancel the the custom
+ message.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqgetrowprcessor">
+ <term>
+ <function>PQgetRowProcessor</function>
+ <indexterm>
+ <primary>PQgetRowProcessor</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Get row processor and its context parameter currently set to
+ the connection.
+<synopsis>
+PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param)
+</synopsis>
+ </para>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>conn</parameter></term>
+ <listitem>
+ <para>
+ The connection object.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><parameter>param</parameter></term>
+ <listitem>
+ <para>
+ Set the current row processor parameter of the
+ connection here if not NULL.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ </sect1>
+
+
<sect1 id="libpq-build">
<title>Building <application>libpq</application> Programs</title>
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..9ce1466 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn
bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
+typedef struct storeInfo
+{
+ Tuplestorestate *tuplestore;
+ int nattrs;
+ MemoryContext oldcontext;
+ AttInMetadata *attinmeta;
+ char** valbuf;
+ int *valbuflen;
+ char **cstrs;
+ bool error_occurred;
+ bool nummismatch;
+} storeInfo;
+
/*
* Internal declarations
*/
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);
static void validate_pkattnums(Relation rel,
int2vector *pkattnums_arg, int32 pknumatts_arg,
int **pkattnums, int *pknumatts);
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, PGrowValue *columns, void *param);
+
/* Global */
static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
char *curname = NULL;
int howmany = 0;
bool fail = true; /* default to backward compatible */
+ storeInfo storeinfo;
DBLINK_INIT;
@@ -559,15 +576,51 @@ dblink_fetch(PG_FUNCTION_ARGS)
appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+ /*
* Try to execute the query. Note that since libpq uses malloc, the
* PGresult will be long-lived even though we are still in a short-lived
* memory context.
*/
- res = PQexec(conn, buf.data);
+ PG_TRY();
+ {
+ res = PQexec(conn, buf.data);
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+
+ finishStoreInfo(&storeinfo);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Skip remaining results when storeHandler raises exception. */
+ PQskipResult(conn, FALSE);
+ ReThrowError(edata);
+ }
+ PG_END_TRY();
+
+ finishStoreInfo(&storeinfo);
+
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
+ /* finishStoreInfo saves the fields referred to below. */
+ if (storeinfo.nummismatch)
+ {
+ /* This is only for backward compatibility */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
+
dblink_res_error(conname, res, "could not fetch from cursor", fail);
return (Datum) 0;
}
@@ -579,8 +632,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));
}
+ PQclear(res);
- materializeResult(fcinfo, res);
return (Datum) 0;
}
@@ -640,6 +693,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible */
bool freeconn = false;
+ storeInfo storeinfo;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -715,164 +769,259 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
rsinfo->setResult = NULL;
rsinfo->setDesc = NULL;
+
+ /*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec/PQgetResult below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
/* synchronous query, or async result retrieval */
- if (!is_async)
- res = PQexec(conn, sql);
- else
+ PG_TRY();
{
- res = PQgetResult(conn);
- /* NULL means we're all done with the async results */
- if (!res)
- return (Datum) 0;
+ if (!is_async)
+ res = PQexec(conn, sql);
+ else
+ res = PQgetResult(conn);
}
+ PG_CATCH();
+ {
+ ErrorData *edata;
- /* if needed, close the connection to the database and cleanup */
- if (freeconn)
- PQfinish(conn);
+ finishStoreInfo(&storeinfo);
+ edata = CopyErrorData();
+ FlushErrorState();
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
+ /* Skip remaining results when storeHandler raises exception. */
+ PQskipResult(conn, FALSE);
+ ReThrowError(edata);
+ }
+ PG_END_TRY();
+
+ finishStoreInfo(&storeinfo);
+
+ /* NULL res from async get means we're all done with the results */
+ if (res || !is_async)
{
- dblink_res_error(conname, res, "could not execute query", fail);
- return (Datum) 0;
+ if (freeconn)
+ PQfinish(conn);
+
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ /* finishStoreInfo saves the fields referred to below. */
+ if (storeinfo.nummismatch)
+ {
+ /* This is only for backward compatibility */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
+
+ dblink_res_error(conname, res, "could not execute query", fail);
+ return (Datum) 0;
+ }
}
+ PQclear(res);
- materializeResult(fcinfo, res);
return (Datum) 0;
}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */
static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ int i;
+
+ switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ {
+ case TYPEFUNC_COMPOSITE:
+ /* success */
+ break;
+ case TYPEFUNC_RECORD:
+ /* failed to determine actual type of RECORD */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record")));
+ break;
+ default:
+ /* result type isn't composite */
+ elog(ERROR, "return type must be a row type");
+ break;
+ }
- Assert(rsinfo->returnMode == SFRM_Materialize);
+ sinfo->oldcontext = MemoryContextSwitchTo(
+ rsinfo->econtext->ecxt_per_query_memory);
- PG_TRY();
+ /* make sure we have a persistent copy of the tupdesc */
+ tupdesc = CreateTupleDescCopy(tupdesc);
+
+ sinfo->error_occurred = FALSE;
+ sinfo->nummismatch = FALSE;
+ sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ sinfo->nattrs = tupdesc->natts;
+ sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+ sinfo->valbuf = NULL;
+ sinfo->valbuflen = NULL;
+
+ /* Preallocate memory of same size with c string array for values. */
+ sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
+ if (sinfo->valbuf)
+ sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+ if (sinfo->valbuflen)
+ sinfo->cstrs = (char **)malloc(sinfo->nattrs * sizeof(char*));
+
+ if (sinfo->cstrs == NULL)
{
- TupleDesc tupdesc;
- bool is_sql_cmd = false;
- int ntuples;
- int nfields;
+ if (sinfo->valbuf)
+ free(sinfo->valbuf);
+ if (sinfo->valbuflen)
+ free(sinfo->valbuflen);
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- is_sql_cmd = true;
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
- /*
- * need a tuple descriptor representing one TEXT column to return
- * the command status string as our result tuple
- */
- tupdesc = CreateTemplateTupleDesc(1, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
- TEXTOID, -1, 0);
- ntuples = 1;
- nfields = 1;
- }
- else
- {
- Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+ for (i = 0 ; i < sinfo->nattrs ; i++)
+ {
+ sinfo->valbuf[i] = NULL;
+ sinfo->valbuflen[i] = -1;
+ }
- is_sql_cmd = false;
+ rsinfo->setResult = sinfo->tuplestore;
+ rsinfo->setDesc = tupdesc;
+}
- /* get a tuple descriptor for our result type */
- switch (get_call_result_type(fcinfo, NULL, &tupdesc))
- {
- case TYPEFUNC_COMPOSITE:
- /* success */
- break;
- case TYPEFUNC_RECORD:
- /* failed to determine actual type of RECORD */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("function returning record called in context "
- "that cannot accept type record")));
- break;
- default:
- /* result type isn't composite */
- elog(ERROR, "return type must be a row type");
- break;
- }
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+ int i;
- /* make sure we have a persistent copy of the tupdesc */
- tupdesc = CreateTupleDescCopy(tupdesc);
- ntuples = PQntuples(res);
- nfields = PQnfields(res);
+ if (sinfo->valbuf)
+ {
+ for (i = 0 ; i < sinfo->nattrs ; i++)
+ {
+ if (sinfo->valbuf[i])
+ free(sinfo->valbuf[i]);
}
+ free(sinfo->valbuf);
+ sinfo->valbuf = NULL;
+ }
- /*
- * check result and tuple descriptor have the same number of columns
- */
- if (nfields != tupdesc->natts)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("remote query result rowtype does not match "
- "the specified FROM clause rowtype")));
+ if (sinfo->valbuflen)
+ {
+ free(sinfo->valbuflen);
+ sinfo->valbuflen = NULL;
+ }
- if (ntuples > 0)
- {
- AttInMetadata *attinmeta;
- Tuplestorestate *tupstore;
- MemoryContext oldcontext;
- int row;
- char **values;
-
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
- oldcontext = MemoryContextSwitchTo(
- rsinfo->econtext->ecxt_per_query_memory);
- tupstore = tuplestore_begin_heap(true, false, work_mem);
- rsinfo->setResult = tupstore;
- rsinfo->setDesc = tupdesc;
- MemoryContextSwitchTo(oldcontext);
+ if (sinfo->cstrs)
+ {
+ free(sinfo->cstrs);
+ sinfo->cstrs = NULL;
+ }
- values = (char **) palloc(nfields * sizeof(char *));
+ MemoryContextSwitchTo(sinfo->oldcontext);
+}
- /* put all tuples into the tuplestore */
- for (row = 0; row < ntuples; row++)
- {
- HeapTuple tuple;
+static int
+storeHandler(PGresult *res, PGrowValue *columns, void *param)
+{
+ storeInfo *sinfo = (storeInfo *)param;
+ HeapTuple tuple;
+ int fields = PQnfields(res);
+ int i;
+ char **cstrs = sinfo->cstrs;
- if (!is_sql_cmd)
- {
- int i;
+ if (sinfo->error_occurred)
+ {
+ PQresultSetErrMsg(res, "storeHandler is called after error\n");
+ return FALSE;
+ }
- for (i = 0; i < nfields; i++)
- {
- if (PQgetisnull(res, row, i))
- values[i] = NULL;
- else
- values[i] = PQgetvalue(res, row, i);
- }
- }
+ if (sinfo->nattrs != fields)
+ {
+ sinfo->error_occurred = TRUE;
+ sinfo->nummismatch = TRUE;
+ finishStoreInfo(sinfo);
+
+ /* This error will be processed in
+ * dblink_record_internal(). So do not set error message
+ * here. */
+
+ PQresultSetErrMsg(res, "unexpected field count in \"D\" message\n");
+ return FALSE;
+ }
+
+ /*
+ * value input functions assumes that the input string is
+ * terminated by zero. We should make the values to be so.
+ */
+ for(i = 0 ; i < fields ; i++)
+ {
+ int len = columns[i].len;
+ if (len < 0)
+ cstrs[i] = NULL;
+ else
+ {
+ char *tmp = sinfo->valbuf[i];
+ int tmplen = sinfo->valbuflen[i];
+
+ /*
+ * Divide calls to malloc and realloc so that things will
+ * go fine even on the systems of which realloc() does not
+ * accept NULL as old memory block.
+ *
+ * Also try to (re)allocate in bigger steps to
+ * avoid flood of allocations on weird data.
+ */
+ if (tmp == NULL)
+ {
+ tmplen = len + 1;
+ if (tmplen < 64)
+ tmplen = 64;
+ tmp = (char *)malloc(tmplen);
+ }
+ else if (tmplen < len + 1)
+ {
+ if (len + 1 > tmplen * 2)
+ tmplen = len + 1;
else
- {
- values[0] = PQcmdStatus(res);
- }
+ tmplen = tmplen * 2;
+ tmp = (char *)realloc(tmp, tmplen);
+ }
- /* build the tuple and put it into the tuplestore. */
- tuple = BuildTupleFromCStrings(attinmeta, values);
- tuplestore_puttuple(tupstore, tuple);
+ /*
+ * sinfo->valbuf[n] will be freed in finishStoreInfo()
+ * when realloc returns NULL.
+ */
+ if (tmp == NULL)
+ {
+ PQresultSetErrMsg(res, "out of memory for query result\n"); return FALSE;
}
- /* clean up and return the tuplestore */
- tuplestore_donestoring(tupstore);
- }
+ sinfo->valbuf[i] = tmp;
+ sinfo->valbuflen[i] = tmplen;
- PQclear(res);
- }
- PG_CATCH();
- {
- /* be sure to release the libpq result */
- PQclear(res);
- PG_RE_THROW();
+ cstrs[i] = sinfo->valbuf[i];
+ memcpy(cstrs[i], columns[i].value, len);
+ cstrs[i][len] = '\0';
+ }
}
- PG_END_TRY();
+
+ /*
+ * These functions may throw exception. It will be caught in
+ * dblink_record_internal()
+ */
+ tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+ tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+ return TRUE;
}
/*
diff --git b/doc/src/sgml/libpq.sgml a/doc/src/sgml/libpq.sgml
index 2deb432..1f91f98 100644
--- b/doc/src/sgml/libpq.sgml
+++ a/doc/src/sgml/libpq.sgml
@@ -7350,7 +7350,17 @@ typedef struct
<para>
This function must return 1 for success, and 0 for failure. On
failure this function should set the error message
- with <function>PQresultSetErrMsg</function>.
+ with <function>PQresultSetErrMsg</function>. When non-blocking
+ API is in use, it can also return 2 for early exit
+ from <function>PQisBusy</function> function. The
+ supplied <parameter>res</parameter>
+ and <parameter>columns</parameter> values will stay valid so
+ row can be processed outside of callback. Caller is
+ responsible for tracking whether
+ the <parameter>PQisBusy</parameter> returned early from
+ callback or for other reasons. Usually this should happen via
+ setting cached values to NULL before
+ calling <function>PQisBusy</function>.
</para>
<para>
diff --git b/src/interfaces/libpq/fe-protocol2.c a/src/interfaces/libpq/fe-protocol2.c
index da36bfa..3b0520d 100644
--- b/src/interfaces/libpq/fe-protocol2.c
+++ a/src/interfaces/libpq/fe-protocol2.c
@@ -827,6 +827,9 @@ getAnotherTuple(PGconn *conn, bool binary)
rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
if (rp == 1)
return 0;
+ else if (rp == 2 && pqIsnonblocking(conn))
+ /* processor requested early exit */
+ return EOF;
else if (rp == 0)
{
errmsg = result->errMsg;
diff --git b/src/interfaces/libpq/fe-protocol3.c a/src/interfaces/libpq/fe-protocol3.c
index 12fef30..c8202c2 100644
--- b/src/interfaces/libpq/fe-protocol3.c
+++ a/src/interfaces/libpq/fe-protocol3.c
@@ -703,6 +703,11 @@ getAnotherTuple(PGconn *conn, int msgLength)
/* everything is good */
return 0;
}
+ if (rp == 2 && pqIsnonblocking(conn))
+ {
+ /* processor requested early exit */
+ return EOF;
+ }
/* there was some problem */
if (rp == 0)
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers