On Fri, Feb 24, 2012 at 05:46:16PM +0200, Marko Kreen wrote:
> - rename to PQrecvRow() and additionally provide PQgetRow()
I tried it and it seems to work as API - there is valid behaviour
for both sync and async connections.
Sync connection - PQgetRow() waits for data from network:
if (!PQsendQuery(db, q))
die(db, "PQsendQuery");
while (1) {
r = PQgetRow(db);
if (!r)
break;
handle(r);
PQclear(r);
}
r = PQgetResult(db);
Async connection - PQgetRow() does PQisBusy() loop internally,
but does not read from network:
on read event:
PQconsumeInput(db);
while (1) {
r = PQgetRow(db);
if (!r)
break;
handle(r);
PQclear(r);
}
if (!PQisBusy(db))
r = PQgetResult(db);
else
waitForMoredata();
As it seems to simplify life for quite many potential users,
it seems worth including in libpq properly.
Attached patch is on top of v20120223 of row-processor
patch. Only change in general code is allowing
early exit for syncronous connection, as we now have
valid use-case for it.
--
marko
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 0087b43..b2779a8 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -4115,6 +4115,111 @@ int PQflush(PGconn *conn);
read-ready and then read the response as described above.
</para>
+ <para>
+ Above-mentioned functions always wait until full resultset has arrived
+ before makeing row data available as PGresult. Sometimes it's
+ more useful to process rows as soon as the arrive from network.
+ For that, following functions can be used:
+ <variablelist>
+ <varlistentry id="libpq-pqgetrow">
+ <term>
+ <function>PQgetRow</function>
+ <indexterm>
+ <primary>PQgetRow</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Waits for the next row from a prior
+ <function>PQsendQuery</function>,
+ <function>PQsendQueryParams</function>,
+ <function>PQsendQueryPrepared</function> call, and returns it.
+ A null pointer is returned when no more rows are available or
+ some error happened.
+<synopsis>
+PGresult *PQgetRow(PGconn *conn);
+</synopsis>
+ </para>
+
+ <para>
+ If this function returns non-NULL result, it is a
+ <structname>PGresult</structname> that contains exactly 1 row.
+ It needs to be freed later with <function>PQclear</function>.
+ </para>
+ <para>
+ On synchronous connection, the function will wait for more
+ data from network until all resultset is done. So it returns
+ NULL only if resultset has completely received or some error
+ happened. In both cases, call <function>PQgetResult</function>
+ next to get final status.
+ </para>
+
+ <para>
+ On asynchronous connection the function does not read more data
+ from network. So after NULL call <function>PQisBusy</function>
+ to see whether final <structname>PGresult</structname> is avilable
+ or more data needs to be read from network via
+ <function>PQconsumeInput</function>. Do not call
+ <function>PQisBusy</function> before <function>PQgetRow</function>
+ has returned NULL, as <function>PQisBusy</function> will parse
+ any available rows and add them to main <function>PGresult</function>
+ that will be returned later by <function>PQgetResult</function>.
+ </para>
+
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="libpq-pqrecvrow">
+ <term>
+ <function>PQrecvRow</function>
+ <indexterm>
+ <primary>PQrecvRow</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ Get row data without constructing PGresult for it. This is the
+ underlying function for <function>PQgetRow</function>.
+<synopsis>
+int PQrecvRow(PGconn *conn, PGresult **hdr_p, PGrowValue **row_p);
+</synopsis>
+ </para>
+
+ <para>
+ It returns row data as pointers to network buffer.
+ All structures are owned by <application>libpq</application>'s
+ <structname>PGconn</structname> and must not be freed or stored
+ by user. Instead row data should be copied to user structures, before
+ any <application>libpq</application> result-processing function
+ is called.
+ </para>
+ <para>
+ It returns 1 when row data is available.
+ Argument <parameter>hdr_p</parameter> will contain pointer
+ to empty <structname>PGresult</structname> that describes
+ row contents. Actual data is in <parameter>row_p</parameter>.
+ For the description of structure <structname>PGrowValue</structname>
+ see <xref linkend="libpq-altrowprocessor">.
+ </para>
+ <para>It returns 0 when no more rows are avalable. On synchronous
+ connection, it means resultset is fully arrived. Call
+ <function>PQgetResult</function> to get final status.
+ On asynchronous connection it can also mean more data
+ needs to be read from network. Call <function>PQisBusy</function>
+ to see whether <function>PQgetResult</function>
+ or <function>PQconsumeInput</function> needs to be called next.
+ </para>
+ <para>
+ it returns -1 if some network error occured.
+ See connection status functions in <xref linkend="libpq-status">.
+ </para>
+
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
</sect1>
<sect1 id="libpq-cancel">
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 7e02497..e9cbe2f 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -164,3 +164,5 @@ PQsetRowProcessor 161
PQgetRowProcessor 162
PQsetRowProcessorErrMsg 163
PQskipResult 164
+PQrecvRow 165
+PQgetRow 166
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index cd287cd..df19824 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1959,6 +1959,131 @@ PQskipResult(PGconn *conn, int skipAll)
return ret;
}
+/* temp buffer to pass pointers */
+struct RecvRowBuf
+{
+ PGresult *temp_hdr;
+ PGrowValue *temp_row;
+};
+
+/* set pointers, do early exit from PQisBusy() */
+static int
+recv_row_proc(PGresult *hdr, PGrowValue *row, void *arg)
+{
+ struct RecvRowBuf *buf = arg;
+ buf->temp_hdr = hdr;
+ buf->temp_row = row;
+ return 2;
+}
+
+/*
+ * PQrecvRow
+ *
+ * Wait and return next row in resultset.
+ *
+ * Returns:
+ * 1 - got row data, the pointers are owned by PGconn
+ * 0 - no rows available, either resultset complete
+ * or more data needed (async-only)
+ * -1 - some problem, check connection error
+ */
+int
+PQrecvRow(PGconn *conn, PGresult **hdr_p, PGrowValue **row_p)
+{
+ struct RecvRowBuf buf;
+ int rc;
+ int ret = -1;
+ PQrowProcessor oldproc;
+ void *oldarg;
+
+ *hdr_p = NULL;
+ *row_p = NULL;
+
+ /* the query may be still pending, send it */
+ while (1)
+ {
+ rc = PQflush(conn);
+ if (rc < 0)
+ return -1;
+ if (rc == 0)
+ break;
+ if (pqWait(FALSE, TRUE, conn))
+ return -1;
+ }
+
+ /* replace existing row processor */
+ oldproc = PQgetRowProcessor(conn, &oldarg);
+ PQsetRowProcessor(conn, recv_row_proc, &buf);
+
+ /* read data */
+ while (1)
+ {
+ buf.temp_hdr = NULL;
+ buf.temp_row = NULL;
+
+ /* done with resultset? */
+ if (!PQisBusy(conn))
+ break;
+
+ /* new row available? */
+ if (buf.temp_row)
+ {
+ *hdr_p = buf.temp_hdr;
+ *row_p = buf.temp_row;
+ ret = 1;
+ goto done;
+ }
+
+ /*
+ * More data needed
+ */
+
+ if (pqIsnonblocking(conn))
+ /* let user worry about new data */
+ break;
+ if (pqWait(TRUE, FALSE, conn))
+ goto done;
+ if (!PQconsumeInput(conn))
+ goto done;
+ }
+ /* no more rows available */
+ ret = 0;
+done:
+ /* restore old row processor */
+ PQsetRowProcessor(conn, oldproc, oldarg);
+ return ret;
+}
+
+/*
+ * PQgetRow
+ * Returns next available row for resultset. NULL means
+ * no row available, either resultset is done
+ * or more data needed (only if async connection).
+ */
+PGresult *
+PQgetRow(PGconn *conn)
+{
+ PGresult *hdr, *res;
+ PGrowValue *row;
+
+ /* check if row is available */
+ if (PQrecvRow(conn, &hdr, &row) != 1)
+ return NULL;
+
+ /* Now make PGresult out of it */
+ res = PQcopyResult(hdr, PG_COPYRES_ATTRS);
+ if (!res)
+ goto NoMem;
+ if (pqAddRow(res, row, NULL))
+ return res;
+
+NoMem:
+ PQclear(res);
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("out of memory\n"));
+ pqSaveErrorResult(conn);
+ return NULL;
+}
/*
* PQdescribePrepared
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 6578019..c922ab7 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -820,7 +820,7 @@ getAnotherTuple(PGconn *conn, bool binary)
rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
if (rp == 1)
return 0;
- else if (rp == 2 && pqIsnonblocking(conn))
+ else if (rp == 2)
/* processor requested early exit */
return EOF;
else if (rp != 0)
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index a19ee88..aee7768 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -697,7 +697,7 @@ getAnotherTuple(PGconn *conn, int msgLength)
/* everything is good */
return 0;
}
- if (rp == 2 && pqIsnonblocking(conn))
+ if (rp == 2)
{
/* processor requested early exit */
return EOF;
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b7370e2..9180ed6 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -400,6 +400,10 @@ extern int PQsendQueryPrepared(PGconn *conn,
int resultFormat);
extern PGresult *PQgetResult(PGconn *conn);
+/* fetch single row from resultset */
+extern PGresult *PQgetRow(PGconn *conn);
+extern int PQrecvRow(PGconn *conn, PGresult **hdr_p, PGrowValue **row_p);
+
/* Routines for managing an asynchronous query */
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers