Hello, This is a new version of the patch formerly known as
'alternative storage for libpq'.
- Changed the concept to 'Alternative Row Processor' from
'Storage handler'. Symbol names are also changed.
- Callback function is modified following to the comment.
- From the restriction of time, I did minimum check for this
patch. The purpose of this patch is to show the new implement.
- Proformance is not measured for this patch for the same
reason. I will do that on next monday.
- The meaning of PGresAttValue is changed. The field 'value' now
contains a value withOUT terminating zero. This change seems to
have no effect on any other portion within the whole source
tree of postgresql from what I've seen.
> > I would like to propose better one-shot API with:
> >
> > void *(*RowStoreHandler)(PGresult *res, PGresAttValue *columns);
...
> > 1) Pass-through processing do not need to care about unnecessary
> > per-row allocations.
> >
> > 2) Handlers that want to copy of the row (like regular libpq),
> > can optimize allocations by having "global" view of the row.
> > (Eg. One allocation for row header + data).
I expect the new implementation is far more better than the
orignal.
regargs,
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..c47af3a 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,6 @@ PQconnectStartParams 157
PQping 158
PQpingParams 159
PQlibVersion 160
+PQregisterRowProcessor 161
+PQgetRowProcessorParam 163
+PQsetRowProcessorErrMes 164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index d454538..93803d5 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,7 @@ makeEmptyPGconn(void)
conn->allow_ssl_try = true;
conn->wait_ssl_try = false;
#endif
+ conn->rowProcessor = NULL;
/*
* We try to send at least 8K at a time, which is the usual size of pipe
@@ -5076,3 +5077,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)
return prev;
}
+
+void
+PQregisterRowProcessor(PGconn *conn, RowProcessor func, void *param)
+{
+ conn->rowProcessor = func;
+ conn->rowProcessorParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..5d78b39 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -66,7 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
-
+static void *pqAddTuple(PGresult *res, PGresAttValue *columns);
/* ----------------
* Space management for PGresult.
@@ -160,6 +160,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
result->curBlock = NULL;
result->curOffset = 0;
result->spaceLeft = 0;
+ result->rowProcessor = pqAddTuple;
+ result->rowProcessorParam = NULL;
+ result->rowProcessorErrMes = NULL;
if (conn)
{
@@ -194,6 +197,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
}
result->nEvents = conn->nEvents;
}
+
+ if (conn->rowProcessor)
+ {
+ result->rowProcessor = conn->rowProcessor;
+ result->rowProcessorParam = conn->rowProcessorParam;
+ }
}
else
{
@@ -445,7 +454,7 @@ PQsetvalue(PGresult *res, int tup_num, int field_num, char *value, int len)
}
/* add it to the array */
- if (!pqAddTuple(res, tup))
+ if (pqAddTuple(res, tup) == NULL)
return FALSE;
}
@@ -701,7 +710,6 @@ pqClearAsyncResult(PGconn *conn)
if (conn->result)
PQclear(conn->result);
conn->result = NULL;
- conn->curTuple = NULL;
}
/*
@@ -756,7 +764,6 @@ pqPrepareAsyncResult(PGconn *conn)
*/
res = conn->result;
conn->result = NULL; /* handing over ownership to caller */
- conn->curTuple = NULL; /* just in case */
if (!res)
res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
else
@@ -829,12 +836,17 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
/*
* pqAddTuple
- * add a row pointer to the PGresult structure, growing it if necessary
- * Returns TRUE if OK, FALSE if not enough memory to add the row
+ * add a row to the PGresult structure, growing it if necessary
+ * Returns the pointer to the new tuple if OK, NULL if not enough
+ * memory to add the row.
*/
-int
-pqAddTuple(PGresult *res, PGresAttValue *tup)
+void *
+pqAddTuple(PGresult *res, PGresAttValue *columns)
{
+ PGresAttValue *tup;
+ int nfields = res->numAttributes;
+ int i;
+
if (res->ntups >= res->tupArrSize)
{
/*
@@ -858,13 +870,39 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)
newTuples = (PGresAttValue **)
realloc(res->tuples, newSize * sizeof(PGresAttValue *));
if (!newTuples)
- return FALSE; /* malloc or realloc failed */
+ return NULL; /* malloc or realloc failed */
res->tupArrSize = newSize;
res->tuples = newTuples;
}
+
+ tup = (PGresAttValue *)
+ pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+ if (tup == NULL) return NULL;
+ memcpy(tup, columns, nfields * sizeof(PGresAttValue));
+
+ for (i = 0 ; i < nfields ; i++)
+ {
+ tup[i].len = columns[i].len;
+ if (tup[i].len == NULL_LEN)
+ {
+ tup[i].value = res->null_field;
+ }
+ else
+ {
+ bool isbinary = (res->attDescs[i].format != 0);
+ tup[i].value =
+ (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+ if (tup[i].value == NULL)
+ return NULL;
+ memcpy(tup[i].value, columns[i].value, tup[i].len);
+ /* We have to terminate this ourselves */
+ tup[i].value[tup[i].len] = '\0';
+ }
+ }
+
res->tuples[res->ntups] = tup;
res->ntups++;
- return TRUE;
+ return tup;
}
/*
@@ -1223,7 +1261,6 @@ PQsendQueryStart(PGconn *conn)
/* initialize async result-accumulation state */
conn->result = NULL;
- conn->curTuple = NULL;
/* ready to send command message */
return true;
@@ -2822,6 +2859,35 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)
return 0;
}
+/* PQgetAddRowProcessorParam
+ * Get the pointer to the contextual parameter from PGresult which is
+ * registered to PGconn by PQregisterRowProcessor
+ */
+void *
+PQgetRowProcessorParam(const PGresult *res)
+{
+ if (!res)
+ return NULL;
+ return res->rowProcessorParam;
+}
+
+/* PQsetRowProcessorErrMes
+ * Set the error message pass back to the caller of RowProcessor.
+ *
+ * mes must be a malloc'ed memory block and it will be released by
+ * the caller of RowProcessor. You can replace the previous message
+ * by alternative mes, or clear it with NULL. The previous one will
+ * be freed internally.
+ */
+void
+PQsetRowProcessorErrMes(PGresult *res, char *mes)
+{
+ /* Free existing message */
+ if (res->rowProcessorErrMes)
+ free(res->rowProcessorErrMes);
+ res->rowProcessorErrMes = mes;
+}
+
/* PQnparams:
* returns the number of input parameters of a prepared statement.
*/
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3..546534a 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)
}
/*
+ * pqGetnchar:
+ * skip len bytes in input buffer.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+ if (len > (size_t) (conn->inEnd - conn->inCursor))
+ return EOF;
+
+ conn->inCursor += len;
+
+ if (conn->Pfdebug)
+ fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+ (unsigned long) len);
+
+ return 0;
+}
+
+/*
* pqPutnchar:
* write exactly len bytes to the current message
*/
@@ -238,6 +257,7 @@ pqPutnchar(const char *s, size_t len, PGconn *conn)
return 0;
}
+
/*
* pqGetInt
* read a 2 or 4 byte integer and convert from network byte order
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..9abbb29 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -715,7 +715,7 @@ getAnotherTuple(PGconn *conn, bool binary)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
- PGresAttValue *tup;
+ PGresAttValue tup[result->numAttributes];
/* the backend sends us a bitmap of which attributes are null */
char std_bitmap[64]; /* used unless it doesn't fit */
@@ -729,26 +729,11 @@ getAnotherTuple(PGconn *conn, bool binary)
result->binary = binary;
- /* Allocate tuple space if first time for this data message */
- if (conn->curTuple == NULL)
+ if (binary)
{
- conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- if (conn->curTuple == NULL)
- goto outOfMemory;
- MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
- /*
- * If it's binary, fix the column format indicators. We assume the
- * backend will consistently send either B or D, not a mix.
- */
- if (binary)
- {
- for (i = 0; i < nfields; i++)
- result->attDescs[i].format = 1;
- }
+ for (i = 0; i < nfields; i++)
+ result->attDescs[i].format = 1;
}
- tup = conn->curTuple;
/* Get the null-value bitmap */
nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
@@ -757,7 +742,7 @@ getAnotherTuple(PGconn *conn, bool binary)
{
bitmap = (char *) malloc(nbytes);
if (!bitmap)
- goto outOfMemory;
+ goto rowProcessError;
}
if (pqGetnchar(bitmap, nbytes, conn))
@@ -785,19 +770,17 @@ getAnotherTuple(PGconn *conn, bool binary)
vlen = vlen - 4;
if (vlen < 0)
vlen = 0;
- if (tup[i].value == NULL)
- {
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
- if (tup[i].value == NULL)
- goto outOfMemory;
- }
+
+ /*
+ * Buffer content may be shifted on reloading data. So we must
+ * set the pointer to the value on every scan.
+ */
+ tup[i].value = conn->inBuffer + conn->inCursor;
tup[i].len = vlen;
- /* read in the value */
+ /* Skip the value */
if (vlen > 0)
- if (pqGetnchar((char *) (tup[i].value), vlen, conn))
+ if (pqSkipnchar(vlen, conn))
goto EOFexit;
- /* we have to terminate this ourselves */
- tup[i].value[vlen] = '\0';
}
/* advance the bitmap stuff */
bitcnt++;
@@ -812,16 +795,15 @@ getAnotherTuple(PGconn *conn, bool binary)
}
/* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
- /* and reset for a new message */
- conn->curTuple = NULL;
+ if (!result->rowProcessor(result, tup))
+ goto rowProcessError;
if (bitmap != std_bitmap)
free(bitmap);
return 0;
-outOfMemory:
+rowProcessError:
+
/* Replace partially constructed result with an error result */
/*
@@ -829,8 +811,21 @@ outOfMemory:
* there's not enough memory to concatenate messages...
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
+ resetPQExpBuffer(&conn->errorMessage);
+
+ /*
+ * If error message is passed from addTupleFunc, set it into
+ * PGconn, assume out of memory if not.
+ */
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext(result->rowProcessorErrMes ?
+ result->rowProcessorErrMes :
+ "out of memory for query result\n"));
+ if (result->rowProcessorErrMes)
+ {
+ free(result->rowProcessorErrMes);
+ result->rowProcessorErrMes = NULL;
+ }
/*
* XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..18342c7 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -625,22 +625,12 @@ getAnotherTuple(PGconn *conn, int msgLength)
{
PGresult *result = conn->result;
int nfields = result->numAttributes;
- PGresAttValue *tup;
+ PGresAttValue tup[result->numAttributes];
int tupnfields; /* # fields from tuple */
int vlen; /* length of the current field value */
int i;
/* Allocate tuple space if first time for this data message */
- if (conn->curTuple == NULL)
- {
- conn->curTuple = (PGresAttValue *)
- pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- if (conn->curTuple == NULL)
- goto outOfMemory;
- MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
- }
- tup = conn->curTuple;
-
/* Get the field count and make sure it's what we expect */
if (pqGetInt(&tupnfields, 2, conn))
return EOF;
@@ -671,40 +661,46 @@ getAnotherTuple(PGconn *conn, int msgLength)
}
if (vlen < 0)
vlen = 0;
- if (tup[i].value == NULL)
- {
- bool isbinary = (result->attDescs[i].format != 0);
- tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
- if (tup[i].value == NULL)
- goto outOfMemory;
- }
- tup[i].len = vlen;
- /* read in the value */
+ /*
+ * Buffer content may be shifted on reloading data. So we must
+ * set the pointer to the value every scan.
+ */
+ tup[i].value = conn->inBuffer + conn->inCursor;
+ tup[i].len = vlen;
if (vlen > 0)
- if (pqGetnchar((char *) (tup[i].value), vlen, conn))
+ if (pqSkipnchar(vlen, conn))
return EOF;
- /* we have to terminate this ourselves */
- tup[i].value[vlen] = '\0';
}
/* Success! Store the completed tuple in the result */
- if (!pqAddTuple(result, tup))
- goto outOfMemory;
- /* and reset for a new message */
- conn->curTuple = NULL;
-
+ if (!result->rowProcessor(result, tup))
+ goto rowProcessError;
+
return 0;
-outOfMemory:
+rowProcessError:
/*
* Replace partially constructed result with an error result. First
* discard the old result to try to win back some memory.
*/
pqClearAsyncResult(conn);
- printfPQExpBuffer(&conn->errorMessage,
- libpq_gettext("out of memory for query result\n"));
+ resetPQExpBuffer(&conn->errorMessage);
+
+ /*
+ * If error message is passed from addTupleFunc, set it into
+ * PGconn, assume out of memory if not.
+ */
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext(result->rowProcessorErrMes ?
+ result->rowProcessorErrMes :
+ "out of memory for query result\n"));
+ if (result->rowProcessorErrMes)
+ {
+ free(result->rowProcessorErrMes);
+ result->rowProcessorErrMes = NULL;
+ }
pqSaveErrorResult(conn);
/* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..0931211 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -149,6 +149,15 @@ typedef struct pgNotify
struct pgNotify *next; /* list link */
} PGnotify;
+/* PGresAttValue represents a value of one tuple field in string form.
+ NULL is represented as len < 0. Otherwise value points to a string
+ without null termination of the length of len. */
+typedef struct pgresAttValue
+{
+ int len; /* length in bytes of the value */
+ char *value; /* actual value, without null termination */
+} PGresAttValue;
+
/* Function types for notice-handling callbacks */
typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
typedef void (*PQnoticeProcessor) (void *arg, const char *message);
@@ -416,6 +425,31 @@ extern PGPing PQping(const char *conninfo);
extern PGPing PQpingParams(const char *const * keywords,
const char *const * values, int expand_dbname);
+/*
+ * Typedef for alternative row processor.
+ *
+ * This function must return non-NULL value for success and must
+ * return NULL for failure and may set error message by
+ * PQsetRowProcessorErrMes. It is assumed by caller as out of memory
+ * when the error message is NULL on failure. This function is assumed
+ * not to throw any exception.
+ */
+typedef void *(*RowProcessor)(PGresult *res, PGresAttValue *columns);
+
+/*
+ * Register alternative result store function to PGconn.
+ *
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ *
+ * rowProcessorParam is the contextual variable that can be get with
+ * PQgetRowProcessorParam in RowProcessor.
+ */
+extern void PQregisterRowProcessor(PGconn *conn, RowProcessor func,
+ void *rowProcessorParam);
+
/* Force the write buffer to be written (or at least try) */
extern int PQflush(PGconn *conn);
@@ -454,6 +488,8 @@ extern char *PQcmdTuples(PGresult *res);
extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
extern int PQgetlength(const PGresult *res, int tup_num, int field_num);
extern int PQgetisnull(const PGresult *res, int tup_num, int field_num);
+extern void *PQgetRowProcessorParam(const PGresult *res);
+extern void PQsetRowProcessorErrMes(PGresult *res, char *mes);
extern int PQnparams(const PGresult *res);
extern Oid PQparamtype(const PGresult *res, int param_num);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d967d60..51ac927 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,12 +134,6 @@ typedef struct pgresParamDesc
#define NULL_LEN (-1) /* pg_result len for NULL value */
-typedef struct pgresAttValue
-{
- int len; /* length in bytes of the value */
- char *value; /* actual value, plus terminating zero byte */
-} PGresAttValue;
-
/* Typedef for message-field list entries */
typedef struct pgMessageField
{
@@ -209,6 +203,11 @@ struct pg_result
PGresult_data *curBlock; /* most recently allocated block */
int curOffset; /* start offset of free space in block */
int spaceLeft; /* number of free bytes remaining in block */
+
+ RowProcessor rowProcessor; /* Result row processor handler. See
+ * RowProcessor for details. */
+ void *rowProcessorParam; /* Contextual parameter for rowProcessor */
+ char *rowProcessorErrMes; /* Error message from rowProcessor */
};
/* PGAsyncStatusType defines the state of the query-execution state machine */
@@ -398,7 +397,6 @@ struct pg_conn
/* Status for asynchronous result construction */
PGresult *result; /* result being constructed */
- PGresAttValue *curTuple; /* tuple currently being read */
#ifdef USE_SSL
bool allow_ssl_try; /* Allowed to try SSL negotiation */
@@ -443,6 +441,13 @@ struct pg_conn
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
+
+ /* Tuple store handler. The two fields below is copied to newly
+ * created PGresult if rowProcessor is not NULL. Use default
+ * function if NULL. */
+ RowProcessor rowProcessor; /* Result row processor. See
+ * RowProcessor for details. */
+ void *rowProcessorParam; /* Contextual parameter for rowProcessor */
};
/* PGcancel stores all data necessary to cancel a connection. A copy of this
@@ -507,7 +512,6 @@ extern void
pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
/* This lets gcc check the format string for consistency. */
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
-extern int pqAddTuple(PGresult *res, PGresAttValue *tup);
extern void pqSaveMessageField(PGresult *res, char code,
const char *value);
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
@@ -560,6 +564,7 @@ extern int pqGets(PQExpBuffer buf, PGconn *conn);
extern int pqGets_append(PQExpBuffer buf, PGconn *conn);
extern int pqPuts(const char *s, PGconn *conn);
extern int pqGetnchar(char *s, size_t len, PGconn *conn);
+extern int pqSkipnchar(size_t len, PGconn *conn);
extern int pqPutnchar(const char *s, size_t len, PGconn *conn);
extern int pqGetInt(int *result, size_t bytes, PGconn *conn);
extern int pqPutInt(int value, size_t bytes, PGconn *conn);
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..9ad3bfd 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,215 @@ int PQisthreadsafe();
</sect1>
+ <sect1 id="libpq-alterrowprocessor">
+ <title>Alternative row processor</title>
+
+ <indexterm zone="libpq-alterrowprocessor">
+ <primary>PGresult</primary>
+ <secondary>PGconn</secondary>
+ </indexterm>
+
+ <para>
+ As the standard usage, users can get the result of command
+ execution from <structname>PGresult</structname> aquired
+ with <function>PGgetResult</function>
+ from <structname>PGConn</structname>. While the memory areas for
+ the PGresult are allocated with malloc() internally within calls of
+ command execution functions such as <function>PQexec</function>
+ and <function>PQgetResult</function>. If you have difficulties to
+ handle the result records in the form of PGresult, you can instruct
+ PGconn to pass every row to your own row processor instead of
+ storing into PGresult.
+ </para>
+
+ <variablelist>
+ <varlistentry id="libpq-registerrowprocessor">
+ <term>
+ <function>PQregisterRowProcessor</function>
+ <indexterm>
+ <primary>PQregisterRowProcessor</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Sets a callback function to process each row.
+<synopsis>
+void PQregisterRowProcessor(PGconn *conn,
+ RowProcessor func,
+ void *param);
+</synopsis>
+ </para>
+
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>conn</parameter></term>
+ <listitem>
+ <para>
+ The connection object to set the storage handler
+ function. PGresult created from this connection calls this
+ function to process each row.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>func</parameter></term>
+ <listitem>
+ <para>
+ Storage handler function to set. NULL means to use the
+ default processor.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>param</parameter></term>
+ <listitem>
+ <para>
+ A pointer to contextual parameter passed
+ to <parameter>func</parameter>. You can get this pointer
+ in <type>RowProcessor</type>
+ by <function>PQgetRowProcessorParam</function>.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-rowprocessor">
+ <term>
+ <type>RowProcessor</type>
+ <indexterm>
+ <primary>RowProcessor</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ The type for the row processor callback function.
+<synopsis>
+void *(*RowProcessor)(PGresult *res,
+ PGresAttValue *columns);
+</synopsis>
+ </para>
+
+ <para>
+ Generally this function must return NULL for failure and should
+ set the error message
+ with <function>PGsetRowProcessorErrMes</function> if the cause
+ is other than out of memory. This funcion must not throw any
+ exception.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>columns</parameter></term>
+ <listitem>
+ <para>
+ An column values of the row to process.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqgetrowprocessorparam">
+ <term>
+ <function>PQgetRowProcessorParam</function>
+ <indexterm>
+ <primary>PQgetRowProcessorParam</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Get the pointer passed to <function>PQregisterRowProcessor</function>
+ as <parameter>param</parameter>.
+<synopsis>
+void *PQgetRowProcessorParam(PGresult *res)
+</synopsis>
+ </para>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry id="libpq-pqsetrowprocessorerrmes">
+ <term>
+ <function>PQsetRowProcessorErrMes</function>
+ <indexterm>
+ <primary>PQsetRowProcessorErrMes</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Set the message for the error occurred
+ in <type>RowProcessor</type>. If this message is not set, the
+ caller assumes the error to be out of memory.
+<synopsis>
+void PQsetRowProcessorErrMes(PGresult *res, char *mes)
+</synopsis>
+ </para>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term><parameter>res</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the <type>PGresult</type> object
+ passed to <type>RowProcessor</type>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><parameter>mes</parameter></term>
+ <listitem>
+ <para>
+ A pointer to the memory block containing the error message,
+ which is allocated by <function>malloc()</function>. The
+ memory block will be freed with <function>free()</function> in
+ the caller of <type>RowProcessor</type> only if it returns NULL.
+ </para>
+ <para>
+ If <parameter>res</parameter> already has a message previously
+ set, it is freed and then the given message is set. Set NULL
+ to cancel the the costom message.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </sect1>
+
+
<sect1 id="libpq-build">
<title>Building <application>libpq</application> Programs</title>
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..195ad21 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn
bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn;
+typedef struct storeInfo
+{
+ Tuplestorestate *tuplestore;
+ int nattrs;
+ MemoryContext oldcontext;
+ AttInMetadata *attinmeta;
+ char** valbuf;
+ int *valbuflen;
+ bool error_occurred;
+ bool nummismatch;
+ ErrorData *edata;
+} storeInfo;
+
/*
* Internal declarations
*/
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);
static void validate_pkattnums(Relation rel,
int2vector *pkattnums_arg, int32 pknumatts_arg,
int **pkattnums, int *pknumatts);
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *storeHandler(PGresult *res, PGresAttValue *columns);
+
/* Global */
static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
char *curname = NULL;
int howmany = 0;
bool fail = true; /* default to backward compatible */
+ storeInfo storeinfo;
DBLINK_INIT;
@@ -559,15 +576,36 @@ dblink_fetch(PG_FUNCTION_ARGS)
appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+
+ /*
* Try to execute the query. Note that since libpq uses malloc, the
* PGresult will be long-lived even though we are still in a short-lived
* memory context.
*/
res = PQexec(conn, buf.data);
+ finishStoreInfo(&storeinfo);
+
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
+ /* finishStoreInfo saves the fields referred to below. */
+ if (storeinfo.nummismatch)
+ {
+ /* This is only for backward compatibility */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
+ else if (storeinfo.edata)
+ ReThrowError(storeinfo.edata);
+
dblink_res_error(conname, res, "could not fetch from cursor", fail);
return (Datum) 0;
}
@@ -579,8 +617,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));
}
+ PQclear(res);
- materializeResult(fcinfo, res);
return (Datum) 0;
}
@@ -640,6 +678,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible */
bool freeconn = false;
+ storeInfo storeinfo;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -715,164 +754,205 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
rsinfo->setResult = NULL;
rsinfo->setDesc = NULL;
+
+ /*
+ * Result is stored into storeinfo.tuplestore instead of
+ * res->result retuned by PQexec/PQgetResult below
+ */
+ initStoreInfo(&storeinfo, fcinfo);
+ PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+
/* synchronous query, or async result retrieval */
if (!is_async)
res = PQexec(conn, sql);
else
- {
res = PQgetResult(conn);
- /* NULL means we're all done with the async results */
- if (!res)
- return (Datum) 0;
- }
- /* if needed, close the connection to the database and cleanup */
- if (freeconn)
- PQfinish(conn);
+ finishStoreInfo(&storeinfo);
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
+ /* NULL res from async get means we're all done with the results */
+ if (res || !is_async)
{
- dblink_res_error(conname, res, "could not execute query", fail);
- return (Datum) 0;
+ if (freeconn)
+ PQfinish(conn);
+
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ /* finishStoreInfo saves the fields referred to below. */
+ if (storeinfo.nummismatch)
+ {
+ /* This is only for backward compatibility */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+ }
+ else if (storeinfo.edata)
+ ReThrowError(storeinfo.edata);
+
+ dblink_res_error(conname, res, "could not execute query", fail);
+ return (Datum) 0;
+ }
}
+ PQclear(res);
- materializeResult(fcinfo, res);
return (Datum) 0;
}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */
static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-
- Assert(rsinfo->returnMode == SFRM_Materialize);
-
- PG_TRY();
+ TupleDesc tupdesc;
+ int i;
+
+ switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
- TupleDesc tupdesc;
- bool is_sql_cmd = false;
- int ntuples;
- int nfields;
+ case TYPEFUNC_COMPOSITE:
+ /* success */
+ break;
+ case TYPEFUNC_RECORD:
+ /* failed to determine actual type of RECORD */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record")));
+ break;
+ default:
+ /* result type isn't composite */
+ elog(ERROR, "return type must be a row type");
+ break;
+ }
+
+ sinfo->oldcontext = MemoryContextSwitchTo(
+ rsinfo->econtext->ecxt_per_query_memory);
+
+ /* make sure we have a persistent copy of the tupdesc */
+ tupdesc = CreateTupleDescCopy(tupdesc);
+
+ sinfo->error_occurred = FALSE;
+ sinfo->nummismatch = FALSE;
+ sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ sinfo->edata = NULL;
+ sinfo->nattrs = tupdesc->natts;
+ sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+
+ /* Preallocate memory of same size with c string array for values. */
+ sinfo->valbuf = (char **) malloc(sinfo->nattrs * sizeof(char*));
+ sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+ for (i = 0 ; i < sinfo->nattrs ; i++)
+ {
+ sinfo->valbuf[i] = NULL;
+ sinfo->valbuflen[i] = -1;
+ }
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- is_sql_cmd = true;
-
- /*
- * need a tuple descriptor representing one TEXT column to return
- * the command status string as our result tuple
- */
- tupdesc = CreateTemplateTupleDesc(1, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
- TEXTOID, -1, 0);
- ntuples = 1;
- nfields = 1;
- }
- else
- {
- Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+ rsinfo->setResult = sinfo->tuplestore;
+ rsinfo->setDesc = tupdesc;
+}
- is_sql_cmd = false;
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+ int i;
- /* get a tuple descriptor for our result type */
- switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ if (sinfo->valbuf)
+ {
+ for (i = 0 ; i < sinfo->nattrs ; i++)
+ {
+ if (sinfo->valbuf[i])
{
- case TYPEFUNC_COMPOSITE:
- /* success */
- break;
- case TYPEFUNC_RECORD:
- /* failed to determine actual type of RECORD */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("function returning record called in context "
- "that cannot accept type record")));
- break;
- default:
- /* result type isn't composite */
- elog(ERROR, "return type must be a row type");
- break;
+ free(sinfo->valbuf[i]);
+ sinfo->valbuf[i] = NULL;
}
-
- /* make sure we have a persistent copy of the tupdesc */
- tupdesc = CreateTupleDescCopy(tupdesc);
- ntuples = PQntuples(res);
- nfields = PQnfields(res);
}
+ free(sinfo->valbuf);
+ sinfo->valbuf = NULL;
+ }
- /*
- * check result and tuple descriptor have the same number of columns
- */
- if (nfields != tupdesc->natts)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("remote query result rowtype does not match "
- "the specified FROM clause rowtype")));
-
- if (ntuples > 0)
- {
- AttInMetadata *attinmeta;
- Tuplestorestate *tupstore;
- MemoryContext oldcontext;
- int row;
- char **values;
-
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
- oldcontext = MemoryContextSwitchTo(
- rsinfo->econtext->ecxt_per_query_memory);
- tupstore = tuplestore_begin_heap(true, false, work_mem);
- rsinfo->setResult = tupstore;
- rsinfo->setDesc = tupdesc;
- MemoryContextSwitchTo(oldcontext);
+ if (sinfo->valbuflen)
+ {
+ free(sinfo->valbuflen);
+ sinfo->valbuflen = NULL;
+ }
+ MemoryContextSwitchTo(sinfo->oldcontext);
+}
- values = (char **) palloc(nfields * sizeof(char *));
+static void *
+storeHandler(PGresult *res, PGresAttValue *columns)
+{
+ storeInfo *sinfo = (storeInfo *)PQgetRowProcessorParam(res);
+ HeapTuple tuple;
+ int fields = PQnfields(res);
+ int i;
+ char *cstrs[PQnfields(res)];
- /* put all tuples into the tuplestore */
- for (row = 0; row < ntuples; row++)
- {
- HeapTuple tuple;
+ if (sinfo->error_occurred)
+ return NULL;
- if (!is_sql_cmd)
- {
- int i;
+ if (sinfo->nattrs != fields)
+ {
+ sinfo->error_occurred = TRUE;
+ sinfo->nummismatch = TRUE;
+ finishStoreInfo(sinfo);
- for (i = 0; i < nfields; i++)
- {
- if (PQgetisnull(res, row, i))
- values[i] = NULL;
- else
- values[i] = PQgetvalue(res, row, i);
- }
- }
- else
- {
- values[0] = PQcmdStatus(res);
- }
+ /* This error will be processed in
+ * dblink_record_internal(). So do not set error message
+ * here. */
+ return NULL;
+ }
- /* build the tuple and put it into the tuplestore. */
- tuple = BuildTupleFromCStrings(attinmeta, values);
- tuplestore_puttuple(tupstore, tuple);
+ /*
+ * value input functions assumes that the value string is
+ * terminated by zero. We should make the values to be so.
+ */
+ for(i = 0 ; i < fields ; i++)
+ {
+ int len = columns[i].len;
+ if (len < 0)
+ cstrs[i] = NULL;
+ else
+ {
+ if (sinfo->valbuf[i] == NULL)
+ {
+ sinfo->valbuf[i] = (char *)malloc(len + 1);
+ sinfo->valbuflen[i] = len + 1;
+ }
+ else if (sinfo->valbuflen[i] < len + 1)
+ {
+ sinfo->valbuf[i] = (char *)realloc(sinfo->valbuf[i], len + 1);
+ sinfo->valbuflen[i] = len + 1;
}
- /* clean up and return the tuplestore */
- tuplestore_donestoring(tupstore);
+ cstrs[i] = sinfo->valbuf[i];
+ memcpy(cstrs[i], columns[i].value, len);
+ cstrs[i][len] = '\0';
}
+ }
- PQclear(res);
+ PG_TRY();
+ {
+ tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+ tuplestore_puttuple(sinfo->tuplestore, tuple);
}
PG_CATCH();
{
- /* be sure to release the libpq result */
- PQclear(res);
- PG_RE_THROW();
+ MemoryContext context;
+ /*
+ * Store exception for later ReThrow and cancel the exception.
+ */
+ sinfo->error_occurred = TRUE;
+ context = MemoryContextSwitchTo(sinfo->oldcontext);
+ sinfo->edata = CopyErrorData();
+ MemoryContextSwitchTo(context);
+ FlushErrorState();
+
+ return NULL;
}
PG_END_TRY();
+
+ return columns;
}
/*
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers