On Fri, Feb 24, 2012 at 05:46:16PM +0200, Marko Kreen wrote:
> - rename to PQrecvRow() and additionally provide PQgetRow()

I tried it and it seems to work as API - there is valid behaviour
for both sync and async connections.

Sync connection - PQgetRow() waits for data from network:

        if (!PQsendQuery(db, q))
                die(db, "PQsendQuery");
        while (1) {
                r = PQgetRow(db);
                if (!r)
                        break;
                handle(r);
                PQclear(r);
        }
        r = PQgetResult(db);

Async connection - PQgetRow() does PQisBusy() loop internally,
but does not read from network:

   on read event:
        PQconsumeInput(db);
        while (1) {
                r = PQgetRow(db);
                if (!r)
                        break;
                handle(r);
                PQclear(r);
        }
        if (!PQisBusy(db))
                r = PQgetResult(db);
        else
                waitForMoredata();


As it seems to simplify life for quite many potential users,
it seems worth including in libpq properly.

Attached patch is on top of v20120223 of row-processor
patch.  Only change in general code is allowing
early exit for syncronous connection, as we now have
valid use-case for it.

-- 
marko

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 0087b43..b2779a8 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -4115,6 +4115,111 @@ int PQflush(PGconn *conn);
    read-ready and then read the response as described above.
   </para>
 
+  <para>
+   Above-mentioned functions always wait until full resultset has arrived
+   before makeing row data available as PGresult.  Sometimes it's
+   more useful to process rows as soon as the arrive from network.
+   For that, following functions can be used:
+   <variablelist>
+    <varlistentry id="libpq-pqgetrow">
+     <term>
+      <function>PQgetRow</function>
+      <indexterm>
+       <primary>PQgetRow</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Waits for the next row from a prior
+       <function>PQsendQuery</function>,
+       <function>PQsendQueryParams</function>,
+       <function>PQsendQueryPrepared</function> call, and returns it.
+       A null pointer is returned when no more rows are available or
+       some error happened.
+<synopsis>
+PGresult *PQgetRow(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       If this function returns non-NULL result, it is a
+       <structname>PGresult</structname> that contains exactly 1 row.
+       It needs to be freed later with <function>PQclear</function>.
+      </para>
+      <para>
+       On synchronous connection, the function will wait for more
+       data from network until all resultset is done.  So it returns
+       NULL only if resultset has completely received or some error
+       happened.  In both cases, call <function>PQgetResult</function>
+       next to get final status.
+      </para>
+
+      <para>
+       On asynchronous connection the function does not read more data
+       from network.   So after NULL call <function>PQisBusy</function>
+       to see whether final <structname>PGresult</structname> is avilable
+       or more data needs to be read from network via
+       <function>PQconsumeInput</function>.  Do not call
+       <function>PQisBusy</function> before <function>PQgetRow</function>
+       has returned NULL, as <function>PQisBusy</function> will parse
+       any available rows and add them to main <function>PGresult</function>
+       that will be returned later by <function>PQgetResult</function>.
+      </para>
+
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-pqrecvrow">
+     <term>
+      <function>PQrecvRow</function>
+      <indexterm>
+       <primary>PQrecvRow</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Get row data without constructing PGresult for it.  This is the
+       underlying function for <function>PQgetRow</function>.
+<synopsis>
+int PQrecvRow(PGconn *conn, PGresult **hdr_p, PGrowValue **row_p);
+</synopsis>
+      </para>
+
+      <para>
+       It returns row data as pointers to network buffer.
+       All structures are owned by <application>libpq</application>'s
+       <structname>PGconn</structname> and must not be freed or stored
+       by user.  Instead row data should be copied to user structures, before
+       any <application>libpq</application> result-processing function
+       is called.
+      </para>
+      <para>
+       It returns 1 when row data is available.
+       Argument <parameter>hdr_p</parameter> will contain pointer
+       to empty <structname>PGresult</structname> that describes
+       row contents.  Actual data is in <parameter>row_p</parameter>.
+       For the description of structure <structname>PGrowValue</structname>
+       see <xref linkend="libpq-altrowprocessor">.
+      </para>
+      <para>It returns 0 when no more rows are avalable.  On synchronous
+       connection, it means resultset is fully arrived.  Call
+       <function>PQgetResult</function> to get final status.
+       On asynchronous connection it can also mean more data
+       needs to be read from network.  Call <function>PQisBusy</function>
+       to see whether <function>PQgetResult</function>
+       or <function>PQconsumeInput</function> needs to be called next.
+      </para>
+      <para>
+       it returns -1 if some network error occured.
+       See connection status functions in <xref linkend="libpq-status">.
+      </para>
+
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </para>
  </sect1>
 
  <sect1 id="libpq-cancel">
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 7e02497..e9cbe2f 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -164,3 +164,5 @@ PQsetRowProcessor	  161
 PQgetRowProcessor	  162
 PQsetRowProcessorErrMsg	  163
 PQskipResult		  164
+PQrecvRow		  165
+PQgetRow		  166
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index cd287cd..df19824 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1959,6 +1959,131 @@ PQskipResult(PGconn *conn, int skipAll)
 	return ret;
 }
 
+/* temp buffer to pass pointers */
+struct RecvRowBuf
+{
+	PGresult *temp_hdr;
+	PGrowValue *temp_row;
+};
+
+/* set pointers, do early exit from PQisBusy() */
+static int
+recv_row_proc(PGresult *hdr, PGrowValue *row, void *arg)
+{
+	struct RecvRowBuf *buf = arg;
+	buf->temp_hdr = hdr;
+	buf->temp_row = row;
+	return 2;
+}
+
+/*
+ * PQrecvRow
+ *
+ * Wait and return next row in resultset.
+ *
+ * Returns:
+ *   1 - got row data, the pointers are owned by PGconn
+ *   0 - no rows available, either resultset complete
+ *       or more data needed (async-only)
+ *  -1 - some problem, check connection error
+ */
+int
+PQrecvRow(PGconn *conn, PGresult **hdr_p, PGrowValue **row_p)
+{
+	struct RecvRowBuf buf;
+	int rc;
+	int ret = -1;
+	PQrowProcessor oldproc;
+	void *oldarg;
+
+	*hdr_p = NULL;
+	*row_p = NULL;
+
+	/* the query may be still pending, send it */
+	while (1)
+	{
+		rc = PQflush(conn);
+		if (rc < 0)
+			return -1;
+		if (rc == 0)
+			break;
+		if (pqWait(FALSE, TRUE, conn))
+			return -1;
+	}
+
+	/* replace existing row processor */
+	oldproc = PQgetRowProcessor(conn, &oldarg);
+	PQsetRowProcessor(conn, recv_row_proc, &buf);
+
+	/* read data */
+	while (1)
+	{
+		buf.temp_hdr = NULL;
+		buf.temp_row = NULL;
+
+		/* done with resultset? */
+		if (!PQisBusy(conn))
+			break;
+
+		/* new row available? */
+		if (buf.temp_row)
+		{
+			*hdr_p = buf.temp_hdr;
+			*row_p = buf.temp_row;
+			ret = 1;
+			goto done;
+		}
+
+		/*
+		 * More data needed
+		 */
+
+		if (pqIsnonblocking(conn))
+			/* let user worry about new data */
+			break;
+		if (pqWait(TRUE, FALSE, conn))
+			goto done;
+		if (!PQconsumeInput(conn))
+			goto done;
+	}
+	/* no more rows available */
+	ret = 0;
+done:
+	/* restore old row processor */
+	PQsetRowProcessor(conn, oldproc, oldarg);
+	return ret;
+}
+
+/*
+ * PQgetRow
+ *		Returns next available row for resultset.  NULL means
+ *		no row available, either resultset is done
+ *		or more data needed (only if async connection).
+ */
+PGresult *
+PQgetRow(PGconn *conn)
+{
+	PGresult *hdr, *res;
+	PGrowValue *row;
+
+	/* check if row is available */
+	if (PQrecvRow(conn, &hdr, &row) != 1)
+		return NULL;
+
+	/* Now make PGresult out of it */
+	res = PQcopyResult(hdr, PG_COPYRES_ATTRS);
+	if (!res)
+		goto NoMem;
+	if (pqAddRow(res, row, NULL))
+		return res;
+
+NoMem:
+	PQclear(res);
+	printfPQExpBuffer(&conn->errorMessage,
+					  libpq_gettext("out of memory\n"));
+	pqSaveErrorResult(conn);
+	return NULL;
+}
 
 /*
  * PQdescribePrepared
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 6578019..c922ab7 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -820,7 +820,7 @@ getAnotherTuple(PGconn *conn, bool binary)
 	rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
 	if (rp == 1)
 		return 0;
-	else if (rp == 2 && pqIsnonblocking(conn))
+	else if (rp == 2)
 		/* processor requested early exit */
 		return EOF;
 	else if (rp != 0)
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index a19ee88..aee7768 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -697,7 +697,7 @@ getAnotherTuple(PGconn *conn, int msgLength)
 		/* everything is good */
 		return 0;
 	}
-	if (rp == 2 && pqIsnonblocking(conn))
+	if (rp == 2)
 	{
 		/* processor requested early exit */
 		return EOF;
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b7370e2..9180ed6 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -400,6 +400,10 @@ extern int PQsendQueryPrepared(PGconn *conn,
 					int resultFormat);
 extern PGresult *PQgetResult(PGconn *conn);
 
+/* fetch single row from resultset */
+extern PGresult *PQgetRow(PGconn *conn);
+extern int PQrecvRow(PGconn *conn, PGresult **hdr_p, PGrowValue **row_p);
+
 /* Routines for managing an asynchronous query */
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to