On Fri, Feb 24, 2012 at 05:46:16PM +0200, Marko Kreen wrote: > - rename to PQrecvRow() and additionally provide PQgetRow()
I tried it and it seems to work as API - there is valid behaviour for both sync and async connections. Sync connection - PQgetRow() waits for data from network: if (!PQsendQuery(db, q)) die(db, "PQsendQuery"); while (1) { r = PQgetRow(db); if (!r) break; handle(r); PQclear(r); } r = PQgetResult(db); Async connection - PQgetRow() does PQisBusy() loop internally, but does not read from network: on read event: PQconsumeInput(db); while (1) { r = PQgetRow(db); if (!r) break; handle(r); PQclear(r); } if (!PQisBusy(db)) r = PQgetResult(db); else waitForMoredata(); As it seems to simplify life for quite many potential users, it seems worth including in libpq properly. Attached patch is on top of v20120223 of row-processor patch. Only change in general code is allowing early exit for syncronous connection, as we now have valid use-case for it. -- marko
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 0087b43..b2779a8 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -4115,6 +4115,111 @@ int PQflush(PGconn *conn); read-ready and then read the response as described above. </para> + <para> + Above-mentioned functions always wait until full resultset has arrived + before makeing row data available as PGresult. Sometimes it's + more useful to process rows as soon as the arrive from network. + For that, following functions can be used: + <variablelist> + <varlistentry id="libpq-pqgetrow"> + <term> + <function>PQgetRow</function> + <indexterm> + <primary>PQgetRow</primary> + </indexterm> + </term> + + <listitem> + <para> + Waits for the next row from a prior + <function>PQsendQuery</function>, + <function>PQsendQueryParams</function>, + <function>PQsendQueryPrepared</function> call, and returns it. + A null pointer is returned when no more rows are available or + some error happened. +<synopsis> +PGresult *PQgetRow(PGconn *conn); +</synopsis> + </para> + + <para> + If this function returns non-NULL result, it is a + <structname>PGresult</structname> that contains exactly 1 row. + It needs to be freed later with <function>PQclear</function>. + </para> + <para> + On synchronous connection, the function will wait for more + data from network until all resultset is done. So it returns + NULL only if resultset has completely received or some error + happened. In both cases, call <function>PQgetResult</function> + next to get final status. + </para> + + <para> + On asynchronous connection the function does not read more data + from network. So after NULL call <function>PQisBusy</function> + to see whether final <structname>PGresult</structname> is avilable + or more data needs to be read from network via + <function>PQconsumeInput</function>. Do not call + <function>PQisBusy</function> before <function>PQgetRow</function> + has returned NULL, as <function>PQisBusy</function> will parse + any available rows and add them to main <function>PGresult</function> + that will be returned later by <function>PQgetResult</function>. + </para> + + </listitem> + </varlistentry> + + <varlistentry id="libpq-pqrecvrow"> + <term> + <function>PQrecvRow</function> + <indexterm> + <primary>PQrecvRow</primary> + </indexterm> + </term> + + <listitem> + <para> + Get row data without constructing PGresult for it. This is the + underlying function for <function>PQgetRow</function>. +<synopsis> +int PQrecvRow(PGconn *conn, PGresult **hdr_p, PGrowValue **row_p); +</synopsis> + </para> + + <para> + It returns row data as pointers to network buffer. + All structures are owned by <application>libpq</application>'s + <structname>PGconn</structname> and must not be freed or stored + by user. Instead row data should be copied to user structures, before + any <application>libpq</application> result-processing function + is called. + </para> + <para> + It returns 1 when row data is available. + Argument <parameter>hdr_p</parameter> will contain pointer + to empty <structname>PGresult</structname> that describes + row contents. Actual data is in <parameter>row_p</parameter>. + For the description of structure <structname>PGrowValue</structname> + see <xref linkend="libpq-altrowprocessor">. + </para> + <para>It returns 0 when no more rows are avalable. On synchronous + connection, it means resultset is fully arrived. Call + <function>PQgetResult</function> to get final status. + On asynchronous connection it can also mean more data + needs to be read from network. Call <function>PQisBusy</function> + to see whether <function>PQgetResult</function> + or <function>PQconsumeInput</function> needs to be called next. + </para> + <para> + it returns -1 if some network error occured. + See connection status functions in <xref linkend="libpq-status">. + </para> + + </listitem> + </varlistentry> + </variablelist> + </para> </sect1> <sect1 id="libpq-cancel"> diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 7e02497..e9cbe2f 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -164,3 +164,5 @@ PQsetRowProcessor 161 PQgetRowProcessor 162 PQsetRowProcessorErrMsg 163 PQskipResult 164 +PQrecvRow 165 +PQgetRow 166 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index cd287cd..df19824 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1959,6 +1959,131 @@ PQskipResult(PGconn *conn, int skipAll) return ret; } +/* temp buffer to pass pointers */ +struct RecvRowBuf +{ + PGresult *temp_hdr; + PGrowValue *temp_row; +}; + +/* set pointers, do early exit from PQisBusy() */ +static int +recv_row_proc(PGresult *hdr, PGrowValue *row, void *arg) +{ + struct RecvRowBuf *buf = arg; + buf->temp_hdr = hdr; + buf->temp_row = row; + return 2; +} + +/* + * PQrecvRow + * + * Wait and return next row in resultset. + * + * Returns: + * 1 - got row data, the pointers are owned by PGconn + * 0 - no rows available, either resultset complete + * or more data needed (async-only) + * -1 - some problem, check connection error + */ +int +PQrecvRow(PGconn *conn, PGresult **hdr_p, PGrowValue **row_p) +{ + struct RecvRowBuf buf; + int rc; + int ret = -1; + PQrowProcessor oldproc; + void *oldarg; + + *hdr_p = NULL; + *row_p = NULL; + + /* the query may be still pending, send it */ + while (1) + { + rc = PQflush(conn); + if (rc < 0) + return -1; + if (rc == 0) + break; + if (pqWait(FALSE, TRUE, conn)) + return -1; + } + + /* replace existing row processor */ + oldproc = PQgetRowProcessor(conn, &oldarg); + PQsetRowProcessor(conn, recv_row_proc, &buf); + + /* read data */ + while (1) + { + buf.temp_hdr = NULL; + buf.temp_row = NULL; + + /* done with resultset? */ + if (!PQisBusy(conn)) + break; + + /* new row available? */ + if (buf.temp_row) + { + *hdr_p = buf.temp_hdr; + *row_p = buf.temp_row; + ret = 1; + goto done; + } + + /* + * More data needed + */ + + if (pqIsnonblocking(conn)) + /* let user worry about new data */ + break; + if (pqWait(TRUE, FALSE, conn)) + goto done; + if (!PQconsumeInput(conn)) + goto done; + } + /* no more rows available */ + ret = 0; +done: + /* restore old row processor */ + PQsetRowProcessor(conn, oldproc, oldarg); + return ret; +} + +/* + * PQgetRow + * Returns next available row for resultset. NULL means + * no row available, either resultset is done + * or more data needed (only if async connection). + */ +PGresult * +PQgetRow(PGconn *conn) +{ + PGresult *hdr, *res; + PGrowValue *row; + + /* check if row is available */ + if (PQrecvRow(conn, &hdr, &row) != 1) + return NULL; + + /* Now make PGresult out of it */ + res = PQcopyResult(hdr, PG_COPYRES_ATTRS); + if (!res) + goto NoMem; + if (pqAddRow(res, row, NULL)) + return res; + +NoMem: + PQclear(res); + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory\n")); + pqSaveErrorResult(conn); + return NULL; +} /* * PQdescribePrepared diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 6578019..c922ab7 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -820,7 +820,7 @@ getAnotherTuple(PGconn *conn, bool binary) rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam); if (rp == 1) return 0; - else if (rp == 2 && pqIsnonblocking(conn)) + else if (rp == 2) /* processor requested early exit */ return EOF; else if (rp != 0) diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index a19ee88..aee7768 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -697,7 +697,7 @@ getAnotherTuple(PGconn *conn, int msgLength) /* everything is good */ return 0; } - if (rp == 2 && pqIsnonblocking(conn)) + if (rp == 2) { /* processor requested early exit */ return EOF; diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index b7370e2..9180ed6 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -400,6 +400,10 @@ extern int PQsendQueryPrepared(PGconn *conn, int resultFormat); extern PGresult *PQgetResult(PGconn *conn); +/* fetch single row from resultset */ +extern PGresult *PQgetRow(PGconn *conn); +extern int PQrecvRow(PGconn *conn, PGresult **hdr_p, PGrowValue **row_p); + /* Routines for managing an asynchronous query */ extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers