I saw Kyotaro already answered, but I give my view as well.

On Thu, Mar 22, 2012 at 06:07:16PM -0400, Tom Lane wrote:
> AFAICT it breaks async processing entirely, because many changes have been
> made that fail to distinguish "insufficient data available as yet" from
> "hard error".  As an example, this code at the beginning of
> getAnotherTuple:
>   
>       /* Get the field count and make sure it's what we expect */
>       if (pqGetInt(&tupnfields, 2, conn))
> !             return EOF;
> 
> is considering three cases: it got a 2-byte integer (and can continue on),
> or there aren't yet 2 more bytes available in the buffer, in which case it
> should return EOF without doing anything, or pqGetInt detected a hard
> error and updated the connection error state accordingly, in which case
> again there is nothing to do except return EOF.  In the patched code we
> have:
> 
>       /* Get the field count and make sure it's what we expect */
>       if (pqGetInt(&tupnfields, 2, conn))
> !     {
> !             /* Whole the message must be loaded on the buffer here */
> !             errmsg = libpq_gettext("protocol error\n");
> !             goto error_and_forward;
> !     }
> 
> which handles neither the second nor third case correctly: it thinks that
> "data not here yet" is a hard error, and then makes sure it is an error by
> destroying the parsing state :-(.  And if in fact pqGetInt did log an
> error, that possibly-useful error message is overwritten with an entirely
> useless "protocol error" text.

No, "protocol error" really is only error case here.

- pqGetInt() does not set errors.

- V3 getAnotherTuple() is called only if packet is fully in buffer.

> I don't think the error return cases for the row processor have been
> thought out too well either.  The row processor is not in charge of what
> happens to the PGresult, and it certainly has no business telling libpq to
> just "exit immediately from the topmost libpq function".  If we do that
> we'll probably lose sync with the data stream and be unable to recover use
> of the connection at all.  Also, do we need to consider any error cases
> for the row processor other than out-of-memory?

No, the rule is *not* "exit to topmost", but "exit PQisBusy()".

This is exactly so that if any code that does not expect row-processor
behaviour continues to work.

Also, from programmers POV, this also means row-processor callback causes
minimal changes to existing APIs.

> If so it might be a good
> idea for it to have some ability to store a custom error message into the
> PGconn, which it cannot do given the current function API.

There already was such function, but it was row-processor specific hack
that could leak out and create confusion.  I rejected it.  Instead there
should be generic error setting function, equivalent to current libpq
internal error setting.

But such generic error setting function would need review all libpq
error states as it allows error state appear in new situations.  Also
we need to have well-defined behaviour of client-side errors vs. incoming
server errors.

Considering that even current cut-down patch is troubling committers,
I would definitely suggest postponing such generic error setter to 9.3.

Especially as it does not change anything coding-style-wise.

> In the same vein, I am fairly uncomfortable with the blithe assertion that
> a row processor can safely longjmp out of libpq.  This was never foreseen
> in the original library coding and there are any number of places that
> that might break, now or in the future.  Do we really need to allow it?
> If we do, it would be a good idea to decorate the libpq functions that are
> now expected to possibly longjmp with comments saying so.  Tracing all the
> potential call paths that might be aborted by a longjmp is an essential
> activity anyway.

I think we *should* allow exceptions, but in limited number of APIs.

Basically, the usefulness for users vs. effort from our side
is clearly on the side of providing it.

But its up to us to define what the *limited* means (what needs
least effort from us), so that later when users want to use exceptions
in callback, they need to pick right API.

Currently it seems only PQexec() + multiple SELECTS can give trouble,
as previous PGresult is kept in stack.  Should we unsupport
PQexec or multiple SELECTS?

But such case it borken even without exceptions - or at least
very confusing.  Not sure what to do with it.


In any case, "decorating" libpq functions is wrong approach.  This gives
suggestion that caller of eg. PQexec() needs to take care of any possible
behaviour of unknown callback.  This will not work.   Instead allowed
functions should be simply listed in row-processor documentation.

Basically custom callback should be always matched by caller that
knows about it and knows how to handle it.  Not sure how to put
such suggestion into documentation tho'.


> Another design deficiency is PQskipResult().  This is badly designed for
> async operation because once you call it, it will absolutely not give back
> control until it's read the whole query result.  (It'd be better to have
> it set some kind of flag that makes future library calls discard data
> until the query result end is reached.)  Something else that's not very
> well-designed about it is that it might throw away more than just incoming
> tuples.  As an example, suppose that the incoming data at the time you
> call it consists of half a dozen rows followed by an ErrorResponse.  The
> ErrorResponse will be eaten and discarded, leaving the application no clue
> why its transaction has been aborted, or perhaps even the entire session
> cancelled.  What we probably want here is just to transiently install a
> row processor that discards all incoming data, but the application is
> still expected to eventually fetch a PGresult that will tell it whether
> the server side of the query completed or not.

I guess it's designed for rolling connection forward in exception
handler...  And it's blocking-only indeed.  Considering that better
approach is to drop the connection, instead trying to save it,
it's usefulness is questionable.

I'm OK with dropping it.

> Lastly, as to the pqgetrow patch, the lack of any demonstrated test case
> for these functions makes me uncomfortable as to whether they are well
> designed.  Again, I'm unconvinced that the error handling is good or that
> they work sanely in async mode.  I'm inclined to just drop these for this
> go-round, and to stick to providing the features that we can test via the
> dblink patch.

I simplified the github test cases and attached as patch.

Could you please give more concrete critique of the API?


Main idea behing PQgetRow is that it does not replace any existing
API function, instead acts as addition:

* Sync case: PQsendQuery() + PQgetResult - PQgetRow should be called
  before PQgetResult until it returns NULL, then proceed with PQgetResult
  to get final state.

* Async case: PQsendQuery() + PQconsumeInput() + PQisBusy() + PQgetResult().
  Here PQgetRow() should be called before PQisBusy() until it returns
  NULL, then proceed with PQisBusy() as usual.

It only returns rows, never any error state PGresults.


Main advantage of including PQgetRow() together with low-level
rowproc API is that it allows de-emphasizing more complex parts of
rowproc API (exceptions, early exit, skipresult, custom error msg).
And drop/undocument them or simply call them postgres-internal-only.

-- 
marko

diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index bbc6ee1..e0d6c41 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,9 +14,11 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 override LDLIBS := $(libpq_pgport) $(LDLIBS)
 
 
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo \
+	rowproc-sync rowproc-async getrow-sync getrow-async
 
 all: $(PROGS)
 
 clean:
-	rm -f $(PROGS)
+	rm -f $(PROGS) *.o
+
diff --git a/src/test/examples/getrow-async.c b/src/test/examples/getrow-async.c
new file mode 100644
index 0000000..d74f77a
--- /dev/null
+++ b/src/test/examples/getrow-async.c
@@ -0,0 +1,196 @@
+/*
+ * PQgetRow async demo.
+ *
+ * usage: getrow-async [connstr [query]]
+ */
+
+#include <sys/select.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <libpq-fe.h>
+
+struct Context {
+	PGconn *db;
+	int count;
+};
+
+static void die(PGconn *db, const char *msg)
+{
+	fprintf(stderr, "%s: %s", msg, PQerrorMessage(db));
+	exit(1);
+}
+
+/* wait for event on socket */
+static void db_wait(PGconn *db, int for_write)
+{
+	int fd = PQsocket(db);
+	fd_set fds;
+	int res;
+
+retry:
+	FD_ZERO(&fds);
+	FD_SET(fd, &fds);
+	if (for_write)
+		res = select(fd+1, NULL, &fds, NULL, NULL);
+	else
+		res = select(fd+1, &fds, NULL, NULL, NULL);
+
+	if (res == 0)
+		goto retry;
+	if (res < 0 && errno == EINTR)
+		goto retry;
+	if (res < 0)
+	{
+		fprintf(stderr, "select() failed: %s", strerror(errno));
+		exit(1);
+	}
+}
+
+static void proc_row(struct Context *ctx, PGresult *res)
+{
+	const char *val = PQgetvalue(res, 0, 0);
+	ctx->count++;
+	if (0)
+	printf("column#0: %s\n", val ? val : "NULL");
+}
+
+static void proc_result(struct Context *ctx, PGresult *r)
+{
+	ExecStatusType s;
+
+	s = PQresultStatus(r);
+	if (s == PGRES_TUPLES_OK)
+		printf("query successful, got %d rows\n", ctx->count);
+	else
+		printf("%s: %s\n", PQresStatus(s), PQerrorMessage(ctx->db));
+	PQclear(r);
+}
+
+/*
+ * Handle socket read event
+ *
+ * Returns:
+ * -1 - error
+ *  0 - need to read more data
+ *  1 - all done
+ */
+
+static int socket_read_cb(struct Context *ctx)
+{
+	PGresult *r;
+
+	/* read incoming data */
+	if (!PQconsumeInput(ctx->db))
+		return -1;
+
+	/*
+	 * One query may result in several PGresults,
+	 * first loop is over all PGresults.
+	 */
+	while (1) {
+		/*
+		 * Process all rows already in buffer.
+		 */
+		while (1) {
+			r = PQgetRow(ctx->db);
+			if (!r)
+				break;
+
+			proc_row(ctx, r);
+
+			PQclear(r);
+		}
+
+		/* regular async logic follows */
+
+		/* Need more data from network */
+		if (PQisBusy(ctx->db))
+			return 0;
+
+		/* we have complete PGresult ready */
+		r = PQgetResult(ctx->db);
+		if (r == NULL) {
+			/* all results have arrived */
+			return 1;
+		} else {
+			/* process final resultset status */
+			proc_result(ctx, r);
+		}
+	}
+}
+
+static void exec_query(struct Context *ctx, const char *q)
+{
+	int res;
+	int waitWrite;
+	PGconn *db = ctx->db;
+
+	ctx->count = 0;
+
+	/* launch query */
+	if (!PQsendQuery(ctx->db, q))
+		die(ctx->db, "PQsendQuery");
+
+	/* flush query */
+	res = PQflush(db);
+	if (res < 0)
+		die(db, "flush 1");
+	waitWrite = res > 0;
+
+	/* read data */
+	while (1) {
+		/* sleep until event */
+		db_wait(ctx->db, waitWrite);
+
+		/* got event, process it */
+		if (waitWrite) {
+			/* still more to flush? */
+			res = PQflush(db);
+			if (res < 0)
+				die(db, "flush 2");
+			waitWrite = res > 0;
+		} else {
+			/* read result */
+			res = socket_read_cb(ctx);
+			if (res < 0)
+				die(db, "socket_read_cb");
+			if (res > 0)
+				return;
+			waitWrite = 0;
+		}
+	}
+}
+
+int main(int argc, char *argv[])
+{
+	const char *connstr;
+	const char *q;
+	PGconn *db;
+	struct Context ctx;
+
+	connstr = "dbname=postgres";
+	if (argc > 1)
+		connstr = argv[1];
+
+	q = "show all";
+	if (argc > 2)
+		q = argv[2];
+
+	db = PQconnectdb(connstr);
+	if (!db || PQstatus(db) == CONNECTION_BAD)
+		die(db, "connect");
+
+	/* set up socket */
+	PQsetnonblocking(db, 1);
+
+	ctx.db = db;
+	exec_query(&ctx, q);
+
+	PQfinish(db);
+
+	return 0;
+}
+
diff --git a/src/test/examples/getrow-sync.c b/src/test/examples/getrow-sync.c
new file mode 100644
index 0000000..a7ed4f6
--- /dev/null
+++ b/src/test/examples/getrow-sync.c
@@ -0,0 +1,83 @@
+/*
+ * PQgetRow sync demo.
+ *
+ * usage: getrow-sync [connstr [query]]
+ */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <libpq-fe.h>
+
+struct Context {
+	PGconn *db;
+	int count;
+};
+
+static void die(PGconn *db, const char *msg)
+{
+	fprintf(stderr, "%s: %s\n", msg, PQerrorMessage(db));
+	exit(1);
+}
+
+static void exec_query(struct Context *ctx, const char *q)
+{
+	PGconn *db = ctx->db;
+	PGresult *r;
+	ExecStatusType s;
+
+	ctx->count = 0;
+
+	if (!PQsendQuery(db, q))
+		die(db, "PQsendQuery");
+
+	/* loop with PQgetRow until final PGresult is available */
+	while (1) {
+		r = PQgetRow(db);
+		if (!r)
+			break;
+		ctx->count++;
+		PQclear(r);
+	}
+
+	/* final PGresult, either PGRES_TUPLES_OK or error */
+	r = PQgetResult(db);
+	s = PQresultStatus(r);
+	if (s == PGRES_TUPLES_OK)
+		printf("query successful, got %d rows\n", ctx->count);
+	else
+		printf("%s: %s\n", PQresStatus(s), PQerrorMessage(db));
+
+	PQclear(r);
+}
+
+
+int main(int argc, char *argv[])
+{
+	const char *connstr;
+	const char *q = "show all";
+	PGconn *db;
+	struct Context ctx;
+
+	connstr = "dbname=postgres";
+	if (argc > 1)
+		connstr = argv[1];
+
+	q = "show all";
+	if (argc > 2)
+		q = argv[2];
+
+	db = PQconnectdb(connstr);
+	if (!db || PQstatus(db) == CONNECTION_BAD)
+		die(db, "connect");
+
+	ctx.db = db;
+	exec_query(&ctx, q);
+
+	PQfinish(db);
+
+	return 0;
+}
+
diff --git a/src/test/examples/rowproc-async.c b/src/test/examples/rowproc-async.c
new file mode 100644
index 0000000..88b1672
--- /dev/null
+++ b/src/test/examples/rowproc-async.c
@@ -0,0 +1,189 @@
+/*
+ * Row processor async demo.
+ *
+ * usage: rowproc-async [connstr [query]]
+ */
+
+
+#include <sys/select.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <libpq-fe.h>
+
+struct Context {
+	PGconn *db;
+	int count;
+};
+
+/* print db error message and die */
+static void die(PGconn *db, const char *msg)
+{
+	fprintf(stderr, "%s: %s", msg, PQerrorMessage(db));
+	exit(1);
+}
+
+/* wait for event on socket */
+static void db_wait(PGconn *db, int for_write)
+{
+	int fd = PQsocket(db);
+	fd_set fds;
+	int res;
+
+retry:
+	FD_ZERO(&fds);
+	FD_SET(fd, &fds);
+	if (for_write)
+		res = select(fd+1, NULL, &fds, NULL, NULL);
+	else
+		res = select(fd+1, &fds, NULL, NULL, NULL);
+
+	if (res == 0)
+		goto retry;
+	if (res < 0 && errno == EINTR)
+		goto retry;
+	if (res < 0)
+	{
+		fprintf(stderr, "select() failed: %s", strerror(errno));
+		exit(1);
+	}
+}
+
+/* do something with one row */
+static void proc_row(struct Context *ctx, PGresult *res, PGrowValue *columns)
+{
+	ctx->count++;
+
+	if (0)
+	printf("column: %.*s\n",
+		   columns[0].len,
+		   columns[0].value);
+}
+
+/* do something with resultset final status */
+static void proc_result(struct Context *ctx, PGresult *r)
+{
+	ExecStatusType s;
+
+	s = PQresultStatus(r);
+	if (s == PGRES_TUPLES_OK)
+		printf("query successful, got %d rows\n", ctx->count);
+	else
+		printf("%s: %s\n", PQresStatus(s), PQerrorMessage(ctx->db));
+	PQclear(r);
+}
+
+/* custom callback */
+static int my_handler(PGresult *res, PGrowValue *columns, void *arg)
+{
+	struct Context *ctx = arg;
+
+	proc_row(ctx, res, columns);
+
+	return 1;
+}
+
+/* this handles socket read event */
+static int socket_read_cb(struct Context *ctx, PGconn *db)
+{
+	PGresult *r;
+
+	/* read incoming data */
+	if (!PQconsumeInput(db))
+		return -1;
+
+	/*
+	 * one query may result in several PGresult's,
+	 * wrap everything in one big loop.
+	 */
+	while (1) {
+		/* need to wait for more data from network */
+		if (PQisBusy(db))
+			return 0;
+
+		/* we have complete PGresult ready */
+		r = PQgetResult(db);
+		if (r == NULL) {
+			/* all results have arrived */
+			return 1;
+		} else {
+			proc_result(ctx, r);
+		}
+	}
+}
+
+/* run query with custom callback */
+static void exec_query(struct Context *ctx, PGconn *db, const char *q)
+{
+	int res;
+	int waitWrite;
+
+	ctx->count = 0;
+
+	/* set up socket */
+	PQsetnonblocking(db, 1);
+
+	PQsetRowProcessor(db, my_handler, ctx);
+
+	/* launch query */
+	if (!PQsendQuery(db, q))
+		die(db, "PQsendQuery");
+
+	/* see if it is sent */
+	res = PQflush(db); // -1:err, 0:ok, 1:more
+	if (res < 0)
+		die(db, "flush 1");
+	waitWrite = res > 0;
+
+	/* read data */
+	while (1) {
+		db_wait(db, waitWrite);
+
+		/* got event, process it */
+		if (waitWrite) {
+			res = PQflush(db); // -1:err, 0:ok, 1:more
+			if (res < 0)
+				die(db, "flush 2");
+			waitWrite = res > 0;
+		} else {
+			res = socket_read_cb(ctx, db);
+			if (res < 0)
+				die(db, "socket_read_cb");
+			if (res > 0)
+				return;
+			waitWrite = 0;
+		}
+	}
+
+	PQsetRowProcessor(ctx->db, NULL, NULL);
+}
+
+int main(int argc, char *argv[])
+{
+	const char *connstr;
+	const char *q;
+	PGconn *db;
+	struct Context ctx;
+
+	connstr = "dbname=postgres";
+	if (argc > 1)
+		connstr = argv[1];
+
+	q = "show all";
+	if (argc > 2)
+		q = argv[2];
+
+	db = PQconnectdb(connstr);
+	if (!db || PQstatus(db) == CONNECTION_BAD)
+		die(db, "connect");
+	ctx.db = db;
+
+	exec_query(&ctx, db, q);
+
+	PQfinish(db);
+
+	return 0;
+}
+
diff --git a/src/test/examples/rowproc-sync.c b/src/test/examples/rowproc-sync.c
new file mode 100644
index 0000000..b29db48
--- /dev/null
+++ b/src/test/examples/rowproc-sync.c
@@ -0,0 +1,80 @@
+/*
+ * Row processor sync demo.
+ *
+ * usage: rowproc-sync [connstr [query]]
+ */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+#include <setjmp.h>
+
+#include <libpq-fe.h>
+
+struct Context {
+	PGconn *db;
+	int count;
+};
+
+static void die(PGconn *db, const char *msg)
+{
+	fprintf(stderr, "%s: %s", msg, PQerrorMessage(db));
+	exit(1);
+}
+
+static int my_handler(PGresult *res, PGrowValue *columns, void *arg)
+{
+	struct Context *ctx = arg;
+
+	ctx->count++;
+
+	return 1;
+}
+
+static void exec_query(struct Context *ctx, const char *q)
+{
+	PGresult *r;
+
+	ctx->count = 0;
+	PQsetRowProcessor(ctx->db, my_handler, ctx);
+
+	r = PQexec(ctx->db, q);
+
+	/* check final result */
+	if (!r || PQresultStatus(r) != PGRES_TUPLES_OK)
+		die(ctx->db, "select");
+	else
+		printf("query successful, got %d rows\n", ctx->count);
+	PQclear(r);
+
+	PQsetRowProcessor(ctx->db, NULL, NULL);
+}
+
+
+int main(int argc, char *argv[])
+{
+	const char *connstr;
+	const char *q;
+	struct Context ctx;
+
+	connstr = "dbname=postgres";
+	if (argc > 1)
+		connstr = argv[1];
+
+	q = "show all";
+	if (argc > 2)
+		q = argv[2];
+
+	ctx.db = PQconnectdb(connstr);
+	if (!ctx.db || PQstatus(ctx.db) == CONNECTION_BAD)
+		die(ctx.db, "connect");
+
+	exec_query(&ctx, q);
+
+	PQfinish(ctx.db);
+
+	return 0;
+}
+
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to