Hello, this is new version of the patch.

> > By the way, I would like to ask you one question. What is the
> > reason why void* should be head or tail of the parameter list?
> 
> Aesthetical reasons:

 I got it. Thank you.

> Last comment - if we drop the plan to make PQsetRowProcessorErrMsg()
> usable outside of handler, we can simplify internal usage as well:
> the PGresult->rowProcessorErrMsg can be dropped and let's use
> ->errMsg to transport the error message.
> 
> The PGresult is long-lived structure and adding fields for such
> temporary usage feels wrong.  There is no other libpq code between
> row processor and getAnotherTuple, so the use is safe.

I almost agree with it. Plus, I think it is no reason to consider
out of memory as particular because now row processor becomes
generic. But the previous patch does different process for OOM
and others, but I couldn't see obvious reason to do so.

- PGresult.rowProcessorErrMes is removed and use PGconn.errMsg
  instead with the new interface function PQresultSetErrMes().

- Now row processors should set message for any error status
  occurred within. pqAddRow and dblink.c is modified to do so.

- getAnotherTuple() sets the error message `unknown error' for
  the condition rp == 0 && ->errMsg == NULL.

- Always forward input cursor and do pqClearAsyncResult() and
  pqSaveErrorResult() when rp == 0 in getAnotherTuple()
  regardless whether ->errMsg is NULL or not in fe-protocol3.c.

- conn->inStart is already moved to the start point of the next
  message when row processor is called. So forwarding inStart in
  outOfMemory1 seems redundant. I removed it.

- printfPQExpBuffer() compains for variable message. So use
  resetPQExpBuffer() and appendPQExpBufferStr() instead.
  
=====
- dblink does not use conn->errorMessage before, and also now...
  all errors are displayed as `Error occurred on dblink connection...'.

- TODO: No NLS messages for error messages.

- Somehow make check yields error for base revision. So I have
  not done that.

- I have no idea how to do test for protocol 2...

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..239edb8 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,7 @@ PQconnectStartParams      157
 PQping                    158
 PQpingParams              159
 PQlibVersion              160
+PQsetRowProcessor	  161
+PQgetRowProcessor	  162
+PQresultSetErrMsg	  163
+PQskipResult		  164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 27a9805..4605e49 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2693,6 +2693,9 @@ makeEmptyPGconn(void)
 	conn->wait_ssl_try = false;
 #endif
 
+	/* set default row processor */
+	PQsetRowProcessor(conn, NULL, NULL);
+
 	/*
 	 * We try to send at least 8K at a time, which is the usual size of pipe
 	 * buffers on Unix systems.  That way, when we are sending a large amount
@@ -2711,8 +2714,13 @@ makeEmptyPGconn(void)
 	initPQExpBuffer(&conn->errorMessage);
 	initPQExpBuffer(&conn->workBuffer);
 
+	/* set up initial row buffer */
+	conn->rowBufLen = 32;
+	conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue));
+
 	if (conn->inBuffer == NULL ||
 		conn->outBuffer == NULL ||
+		conn->rowBuf == NULL ||
 		PQExpBufferBroken(&conn->errorMessage) ||
 		PQExpBufferBroken(&conn->workBuffer))
 	{
@@ -2814,6 +2822,8 @@ freePGconn(PGconn *conn)
 		free(conn->inBuffer);
 	if (conn->outBuffer)
 		free(conn->outBuffer);
+	if (conn->rowBuf)
+		free(conn->rowBuf);
 	termPQExpBuffer(&conn->errorMessage);
 	termPQExpBuffer(&conn->workBuffer);
 
@@ -5078,3 +5088,4 @@ PQregisterThreadLock(pgthreadlock_t newhandler)
 
 	return prev;
 }
+
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..7fd3c9c 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int PQsendDescribe(PGconn *conn, char desc_type,
 			   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
+static int	pqAddRow(PGresult *res, PGrowValue *columns, void *param);
 
 
 /* ----------------
@@ -701,7 +702,6 @@ pqClearAsyncResult(PGconn *conn)
 	if (conn->result)
 		PQclear(conn->result);
 	conn->result = NULL;
-	conn->curTuple = NULL;
 }
 
 /*
@@ -756,7 +756,6 @@ pqPrepareAsyncResult(PGconn *conn)
 	 */
 	res = conn->result;
 	conn->result = NULL;		/* handing over ownership to caller */
-	conn->curTuple = NULL;		/* just in case */
 	if (!res)
 		res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
 	else
@@ -828,6 +827,93 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
 }
 
 /*
+ * PQsetRowProcessor
+ *   Set function that copies column data out from network buffer.
+ */
+void
+PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+{
+	conn->rowProcessor = (func ? func : pqAddRow);
+	conn->rowProcessorParam = param;
+}
+
+/*
+ * PQgetRowProcessor
+ *   Get current row processor of conn. set pointer to current parameter for
+ *   row processor to param if not NULL.
+ */
+PQrowProcessor
+PQgetRowProcessor(PGconn *conn, void **param)
+{
+	if (param)
+		*param = conn->rowProcessorParam;
+
+	return conn->rowProcessor;
+}
+
+/*
+ * PQresultSetErrMsg
+ *    Set the error message to PGresult.
+ *
+ *    You can replace the previous message by alternative mes, or clear
+ *    it with NULL.
+ */
+void
+PQresultSetErrMsg(PGresult *res, const char *msg)
+{
+	if (msg)
+		res->errMsg = pqResultStrdup(res, msg);
+	else
+		res->errMsg = NULL;
+}
+
+/*
+ * pqAddRow
+ *	  add a row to the PGresult structure, growing it if necessary
+ *	  Returns TRUE if OK, FALSE if not enough memory to add the row.
+ */
+static int
+pqAddRow(PGresult *res, PGrowValue *columns, void *param)
+{
+	PGresAttValue *tup;
+	int			nfields = res->numAttributes;
+	int			i;
+
+	tup = (PGresAttValue *)
+		pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+	if (tup == NULL)
+	{
+		PQresultSetErrMsg(res, "out of memory for query result\n");
+		return FALSE;
+	}
+
+	for (i = 0 ; i < nfields ; i++)
+	{
+		tup[i].len = columns[i].len;
+		if (tup[i].len == NULL_LEN)
+		{
+			tup[i].value = res->null_field;
+		}
+		else
+		{
+			bool isbinary = (res->attDescs[i].format != 0);
+			tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+			if (tup[i].value == NULL)
+			{
+				PQresultSetErrMsg(res, "out of memory for query result\n");
+				return FALSE;
+			}
+
+			memcpy(tup[i].value, columns[i].value, tup[i].len);
+			/* We have to terminate this ourselves */
+			tup[i].value[tup[i].len] = '\0';
+		}
+	}
+
+	return pqAddTuple(res, tup);
+}
+
+/*
  * pqAddTuple
  *	  add a row pointer to the PGresult structure, growing it if necessary
  *	  Returns TRUE if OK, FALSE if not enough memory to add the row
@@ -1223,7 +1309,6 @@ PQsendQueryStart(PGconn *conn)
 
 	/* initialize async result-accumulation state */
 	conn->result = NULL;
-	conn->curTuple = NULL;
 
 	/* ready to send command message */
 	return true;
@@ -1831,6 +1916,55 @@ PQexecFinish(PGconn *conn)
 	return lastResult;
 }
 
+
+/*
+ * Do-nothing row processor for PQskipResult
+ */
+static int
+dummyRowProcessor(PGresult *res, PGrowValue *columns, void *param)
+{
+	return 1;
+}
+
+/*
+ * Exaust remaining Data Rows in curret conn.
+ * 
+ * Exaust current result if skipAll is false and all succeeding results if
+ * true.
+ */
+int
+PQskipResult(PGconn *conn, int skipAll)
+{
+	PQrowProcessor savedRowProcessor;
+	void * savedRowProcParam;
+	PGresult *res;
+	int ret = 0;
+
+	/* save the current row processor settings and set dummy processor */
+	savedRowProcessor = PQgetRowProcessor(conn, &savedRowProcParam);
+	PQsetRowProcessor(conn, dummyRowProcessor, NULL);
+	
+	/*
+	 * Throw away the remaining rows in current result, or all succeeding
+	 * results if skipAll is not FALSE.
+	 */
+	if (skipAll)
+	{
+		while ((res = PQgetResult(conn)) != NULL)
+			PQclear(res);
+	}
+	else if ((res = PQgetResult(conn)) != NULL)
+	{
+		PQclear(res);
+		ret = 1;
+	}
+	
+	PQsetRowProcessor(conn, savedRowProcessor, savedRowProcParam);
+
+	return ret;
+}
+
+
 /*
  * PQdescribePrepared
  *	  Obtain information about a previously prepared statement
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3..d11cb3c 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)
 }
 
 /*
+ * pqGetnchar:
+ *	skip len bytes in input buffer.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+	if (len > (size_t) (conn->inEnd - conn->inCursor))
+		return EOF;
+
+	conn->inCursor += len;
+
+	if (conn->Pfdebug)
+		fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+				(unsigned long) len);
+
+	return 0;
+}
+
+/*
  * pqPutnchar:
  *	write exactly len bytes to the current message
  */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..3b0520d 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -569,6 +569,8 @@ pqParseInput2(PGconn *conn)
 						/* Read another tuple of a normal query response */
 						if (getAnotherTuple(conn, FALSE))
 							return;
+						/* getAnotherTuple moves inStart itself */
+						continue;
 					}
 					else
 					{
@@ -585,6 +587,8 @@ pqParseInput2(PGconn *conn)
 						/* Read another tuple of a normal query response */
 						if (getAnotherTuple(conn, TRUE))
 							return;
+						/* getAnotherTuple moves inStart itself */
+						continue;
 					}
 					else
 					{
@@ -703,52 +707,55 @@ failure:
 
 /*
  * parseInput subroutine to read a 'B' or 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
+ * It fills rowbuf with column pointers and then calls row processor.
  * Returns: 0 if completed message, EOF if error or not enough data yet.
  *
  * Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received.
  */
 static int
 getAnotherTuple(PGconn *conn, bool binary)
 {
 	PGresult   *result = conn->result;
 	int			nfields = result->numAttributes;
-	PGresAttValue *tup;
+	PGrowValue  *rowbuf;
 
 	/* the backend sends us a bitmap of which attributes are null */
 	char		std_bitmap[64]; /* used unless it doesn't fit */
 	char	   *bitmap = std_bitmap;
 	int			i;
+	int			rp;
 	size_t		nbytes;			/* the number of bytes in bitmap  */
 	char		bmap;			/* One byte of the bitmap */
 	int			bitmap_index;	/* Its index */
 	int			bitcnt;			/* number of bits examined in current byte */
 	int			vlen;			/* length of the current field value */
+	char        *errmsg = libpq_gettext("unknown error\n");
 
-	result->binary = binary;
-
-	/* Allocate tuple space if first time for this data message */
-	if (conn->curTuple == NULL)
+	/* resize row buffer if needed */
+	if (nfields > conn->rowBufLen)
 	{
-		conn->curTuple = (PGresAttValue *)
-			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-		if (conn->curTuple == NULL)
-			goto outOfMemory;
-		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
-		/*
-		 * If it's binary, fix the column format indicators.  We assume the
-		 * backend will consistently send either B or D, not a mix.
-		 */
-		if (binary)
+		rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+		if (!rowbuf)
 		{
-			for (i = 0; i < nfields; i++)
-				result->attDescs[i].format = 1;
+			errmsg = libpq_gettext("out of memory for query result\n");
+			goto error;
 		}
+		conn->rowBuf = rowbuf;
+		conn->rowBufLen = nfields;
+	}
+	else
+	{
+		rowbuf = conn->rowBuf;
+	}
+
+	result->binary = binary;
+
+	if (binary)
+	{
+		for (i = 0; i < nfields; i++)
+			result->attDescs[i].format = 1;
 	}
-	tup = conn->curTuple;
 
 	/* Get the null-value bitmap */
 	nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
@@ -757,7 +764,10 @@ getAnotherTuple(PGconn *conn, bool binary)
 	{
 		bitmap = (char *) malloc(nbytes);
 		if (!bitmap)
-			goto outOfMemory;
+		{
+			errmsg = libpq_gettext("out of memory for query result\n");
+			goto error;
+		}
 	}
 
 	if (pqGetnchar(bitmap, nbytes, conn))
@@ -771,34 +781,29 @@ getAnotherTuple(PGconn *conn, bool binary)
 	for (i = 0; i < nfields; i++)
 	{
 		if (!(bmap & 0200))
-		{
-			/* if the field value is absent, make it a null string */
-			tup[i].value = result->null_field;
-			tup[i].len = NULL_LEN;
-		}
+			vlen = NULL_LEN;
+		else if (pqGetInt(&vlen, 4, conn))
+				goto EOFexit;
 		else
 		{
-			/* get the value length (the first four bytes are for length) */
-			if (pqGetInt(&vlen, 4, conn))
-				goto EOFexit;
 			if (!binary)
 				vlen = vlen - 4;
 			if (vlen < 0)
 				vlen = 0;
-			if (tup[i].value == NULL)
-			{
-				tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
-				if (tup[i].value == NULL)
-					goto outOfMemory;
-			}
-			tup[i].len = vlen;
-			/* read in the value */
-			if (vlen > 0)
-				if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-					goto EOFexit;
-			/* we have to terminate this ourselves */
-			tup[i].value[vlen] = '\0';
 		}
+
+		/*
+		 * rowbuf[i].value always points to the next address of the
+		 * length field even if the value is NULL, to allow safe
+		 * size estimates and data copy.
+		 */
+		rowbuf[i].value = conn->inBuffer + conn->inCursor;
+		rowbuf[i].len = vlen;
+
+		/* Skip the value */
+		if (vlen > 0 && pqSkipnchar(vlen, conn))
+			goto EOFexit;
+
 		/* advance the bitmap stuff */
 		bitcnt++;
 		if (bitcnt == BITS_PER_BYTE)
@@ -811,33 +816,53 @@ getAnotherTuple(PGconn *conn, bool binary)
 			bmap <<= 1;
 	}
 
-	/* Success!  Store the completed tuple in the result */
-	if (!pqAddTuple(result, tup))
-		goto outOfMemory;
-	/* and reset for a new message */
-	conn->curTuple = NULL;
-
 	if (bitmap != std_bitmap)
 		free(bitmap);
-	return 0;
+	bitmap = NULL;
 
-outOfMemory:
-	/* Replace partially constructed result with an error result */
+	/* tag the row as parsed */
+	conn->inStart = conn->inCursor;
 
+	/* Pass the completed row values to rowProcessor */
+	rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
+	if (rp == 1)
+		return 0;
+	else if (rp == 2 && pqIsnonblocking(conn))
+		/* processor requested early exit */
+		return EOF;
+	else if (rp == 0)
+	{
+		errmsg = result->errMsg;
+		result->errMsg = NULL;
+		if (errmsg == NULL)
+			errmsg = libpq_gettext("unknown error in row processor\n");
+		goto error;
+	}
+
+	errmsg = libpq_gettext("invalid return value from row processor\n");
+
+error:
 	/*
 	 * we do NOT use pqSaveErrorResult() here, because of the likelihood that
 	 * there's not enough memory to concatenate messages...
 	 */
 	pqClearAsyncResult(conn);
-	printfPQExpBuffer(&conn->errorMessage,
-					  libpq_gettext("out of memory for query result\n"));
-
+	resetPQExpBuffer(&conn->errorMessage);
+	
+	/*
+	 * If error message is passed from RowProcessor, set it into
+	 * PGconn, assume out of memory if not.
+	 */
+	appendPQExpBufferStr(&conn->errorMessage, errmsg);
+	
 	/*
 	 * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
 	 * do to recover...
 	 */
 	conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
+
 	conn->asyncStatus = PGASYNC_READY;
+
 	/* Discard the failed message --- good idea? */
 	conn->inStart = conn->inEnd;
 
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..c8202c2 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -327,6 +327,9 @@ pqParseInput3(PGconn *conn)
 						/* Read another tuple of a normal query response */
 						if (getAnotherTuple(conn, msgLength))
 							return;
+
+						/* getAnotherTuple() moves inStart itself */
+						continue;
 					}
 					else if (conn->result != NULL &&
 							 conn->result->resultStatus == PGRES_FATAL_ERROR)
@@ -613,33 +616,23 @@ failure:
 
 /*
  * parseInput subroutine to read a 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
+ * It fills rowbuf with column pointers and then calls row processor.
  * Returns: 0 if completed message, EOF if error or not enough data yet.
  *
  * Note that if we run out of data, we have to suspend and reprocess
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received.
  */
 static int
 getAnotherTuple(PGconn *conn, int msgLength)
 {
 	PGresult   *result = conn->result;
 	int			nfields = result->numAttributes;
-	PGresAttValue *tup;
+	PGrowValue  *rowbuf;
 	int			tupnfields;		/* # fields from tuple */
 	int			vlen;			/* length of the current field value */
 	int			i;
-
-	/* Allocate tuple space if first time for this data message */
-	if (conn->curTuple == NULL)
-	{
-		conn->curTuple = (PGresAttValue *)
-			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-		if (conn->curTuple == NULL)
-			goto outOfMemory;
-		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-	}
-	tup = conn->curTuple;
+	int			rp;
+	char        *errmsg = libpq_gettext("unknown error\n");
 
 	/* Get the field count and make sure it's what we expect */
 	if (pqGetInt(&tupnfields, 2, conn))
@@ -647,13 +640,22 @@ getAnotherTuple(PGconn *conn, int msgLength)
 
 	if (tupnfields != nfields)
 	{
-		/* Replace partially constructed result with an error result */
-		printfPQExpBuffer(&conn->errorMessage,
-				 libpq_gettext("unexpected field count in \"D\" message\n"));
-		pqSaveErrorResult(conn);
-		/* Discard the failed message by pretending we read it */
-		conn->inCursor = conn->inStart + 5 + msgLength;
-		return 0;
+		errmsg = libpq_gettext("unexpected field count in \"D\" message\n");
+		goto error_and_forward;
+	}
+
+	/* resize row buffer if needed */
+	rowbuf = conn->rowBuf;
+	if (nfields > conn->rowBufLen)
+	{
+		rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+		if (!rowbuf)
+		{
+			errmsg = libpq_gettext("out of memory for query result\n");
+			goto error_and_forward;
+		}
+		conn->rowBuf = rowbuf;
+		conn->rowBufLen = nfields;
 	}
 
 	/* Scan the fields */
@@ -661,54 +663,78 @@ getAnotherTuple(PGconn *conn, int msgLength)
 	{
 		/* get the value length */
 		if (pqGetInt(&vlen, 4, conn))
-			return EOF;
-		if (vlen == -1)
 		{
-			/* null field */
-			tup[i].value = result->null_field;
-			tup[i].len = NULL_LEN;
-			continue;
+			/*
+			 * Forwarding inCursor does not make no sense when protocol error
+			 */
+			errmsg = libpq_gettext("invalid row contents\n");
+			goto error;
 		}
-		if (vlen < 0)
+		if (vlen == -1)
+			vlen = NULL_LEN;
+		else if (vlen < 0)
 			vlen = 0;
-		if (tup[i].value == NULL)
-		{
-			bool		isbinary = (result->attDescs[i].format != 0);
 
-			tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
-			if (tup[i].value == NULL)
-				goto outOfMemory;
-		}
-		tup[i].len = vlen;
-		/* read in the value */
-		if (vlen > 0)
-			if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-				return EOF;
-		/* we have to terminate this ourselves */
-		tup[i].value[vlen] = '\0';
+		/*
+		 * rowbuf[i].value always points to the next address of the
+		 * length field even if the value is NULL, to allow safe
+		 * size estimates and data copy.
+		 */
+		rowbuf[i].value = conn->inBuffer + conn->inCursor;
+		rowbuf[i].len = vlen;
+
+		/* Skip to the next length field */
+		if (vlen > 0 && pqSkipnchar(vlen, conn))
+			return EOF;
 	}
 
-	/* Success!  Store the completed tuple in the result */
-	if (!pqAddTuple(result, tup))
-		goto outOfMemory;
-	/* and reset for a new message */
-	conn->curTuple = NULL;
+	/* tag the row as parsed, check if correctly */
+	conn->inStart += 5 + msgLength;
+	if (conn->inCursor != conn->inStart)
+	{
+		errmsg = libpq_gettext("invalid row contents\n");
+		goto error;
+	}
 
-	return 0;
+	/* Pass the completed row values to rowProcessor */
+	rp = conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
+	if (rp == 1)
+	{
+		/* everything is good */
+		return 0;
+	}
+	if (rp == 2 && pqIsnonblocking(conn))
+	{
+		/* processor requested early exit */
+		return EOF;
+	}
 
-outOfMemory:
+	/* there was some problem */
+	if (rp == 0)
+	{
+		errmsg = result->errMsg;
+		result->errMsg = NULL;
+		if (errmsg == NULL)
+			errmsg = libpq_gettext("unknown error in row processor\n");
+		goto error;
+	}
+
+	errmsg = libpq_gettext("invalid return value from row processor\n");
+	goto error;
+
+error_and_forward:
+	/* Discard the failed message by pretending we read it */
+	conn->inCursor = conn->inStart + 5 + msgLength;
 
+error:
 	/*
 	 * Replace partially constructed result with an error result. First
 	 * discard the old result to try to win back some memory.
 	 */
 	pqClearAsyncResult(conn);
-	printfPQExpBuffer(&conn->errorMessage,
-					  libpq_gettext("out of memory for query result\n"));
+	resetPQExpBuffer(&conn->errorMessage);
+	appendPQExpBufferStr(&conn->errorMessage, errmsg);
 	pqSaveErrorResult(conn);
-
-	/* Discard the failed message by pretending we read it */
-	conn->inCursor = conn->inStart + 5 + msgLength;
 	return 0;
 }
 
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..810b04e 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -149,6 +149,17 @@ typedef struct pgNotify
 	struct pgNotify *next;		/* list link */
 } PGnotify;
 
+/* PGrowValue points a column value of in network buffer.
+ * Value is a string without null termination and length len.
+ * NULL is represented as len < 0, value points then to place
+ * where value would have been.
+ */
+typedef struct pgRowValue
+{
+	int			len;			/* length in bytes of the value */
+	char	   *value;			/* actual value, without null termination */
+} PGrowValue;
+
 /* Function types for notice-handling callbacks */
 typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
 typedef void (*PQnoticeProcessor) (void *arg, const char *message);
@@ -416,6 +427,38 @@ extern PGPing PQping(const char *conninfo);
 extern PGPing PQpingParams(const char *const * keywords,
 			 const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative row processor.
+ *
+ * Columns array will contain PQnfields() entries, each one
+ * pointing to particular column data in network buffer.
+ * This function is supposed to copy data out from there
+ * and store somewhere.  NULL is signified with len<0.
+ *
+ * This function must return 1 for success and must return 0 for
+ * failure and may set error message by PQresultSetErrMsg.  It is assumed by
+ * caller as out of memory when the error message is not set on
+ * failure. This function is assumed not to throw any exception.
+ */
+typedef int (*PQrowProcessor)(PGresult *res, PGrowValue *columns,
+                              void *param);
+
+/*
+ * Set alternative row data processor for PGconn.
+ *
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ *
+ * rowProcessorParam is the contextual variable that passed to
+ * RowProcessor.
+ */
+extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func,
+								   void *rowProcessorParam);
+extern PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param);
+extern int  PQskipResult(PGconn *conn, int skipAll);
+
 /* Force the write buffer to be written (or at least try) */
 extern int	PQflush(PGconn *conn);
 
@@ -454,6 +497,7 @@ extern char *PQcmdTuples(PGresult *res);
 extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
 extern int	PQgetlength(const PGresult *res, int tup_num, int field_num);
 extern int	PQgetisnull(const PGresult *res, int tup_num, int field_num);
+extern void	PQresultSetErrMsg(PGresult *res, const char *msg);
 extern int	PQnparams(const PGresult *res);
 extern Oid	PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 987311e..9cabd20 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -398,7 +398,6 @@ struct pg_conn
 
 	/* Status for asynchronous result construction */
 	PGresult   *result;			/* result being constructed */
-	PGresAttValue *curTuple;	/* tuple currently being read */
 
 #ifdef USE_SSL
 	bool		allow_ssl_try;	/* Allowed to try SSL negotiation */
@@ -443,6 +442,14 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/*
+	 * Read column data from network buffer.
+	 */
+	PQrowProcessor rowProcessor;/* Function pointer */
+	void *rowProcessorParam;	/* Contextual parameter for rowProcessor */
+	PGrowValue *rowBuf;			/* Buffer for passing values to rowProcessor */
+	int rowBufLen;				/* Number of columns allocated in rowBuf */
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
@@ -560,6 +567,7 @@ extern int	pqGets(PQExpBuffer buf, PGconn *conn);
 extern int	pqGets_append(PQExpBuffer buf, PGconn *conn);
 extern int	pqPuts(const char *s, PGconn *conn);
 extern int	pqGetnchar(char *s, size_t len, PGconn *conn);
+extern int	pqSkipnchar(size_t len, PGconn *conn);
 extern int	pqPutnchar(const char *s, size_t len, PGconn *conn);
 extern int	pqGetInt(int *result, size_t bytes, PGconn *conn);
 extern int	pqPutInt(int value, size_t bytes, PGconn *conn);
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..1f91f98 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,329 @@ int PQisthreadsafe();
  </sect1>
 
 
+ <sect1 id="libpq-altrowprocessor">
+  <title>Alternative row processor</title>
+
+  <indexterm zone="libpq-altrowprocessor">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, rows are stored into <type>PQresult</type>
+   until full resultset is received.  Then such completely-filled
+   <type>PQresult</type> is passed to user.  This behavior can be
+   changed by registering alternative row processor function,
+   that will see each row data as soon as it is received
+   from network.  It has the option of processing the data
+   immediately, or storing it into custom container.
+  </para>
+
+  <para>
+   Note - as row processor sees rows as they arrive, it cannot know
+   whether the SQL statement actually finishes successfully on server
+   or not.  So some care must be taken to get proper
+   transactionality.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetrowprocessor">
+    <term>
+     <function>PQsetRowProcessor</function>
+     <indexterm>
+      <primary>PQsetRowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to process each row.
+<synopsis>
+void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+	 <varlistentry>
+	   <term><parameter>conn</parameter></term>
+	   <listitem>
+	     <para>
+	       The connection object to set the row processor function.
+	     </para>
+	   </listitem>
+	 </varlistentry>
+	 <varlistentry>
+	   <term><parameter>func</parameter></term>
+	   <listitem>
+	     <para>
+	       Storage handler function to set. NULL means to use the
+	       default processor.
+	     </para>
+	   </listitem>
+	 </varlistentry>
+	 <varlistentry>
+	   <term><parameter>param</parameter></term>
+	   <listitem>
+	     <para>
+	       A pointer to contextual parameter passed
+	       to <parameter>func</parameter>.
+	     </para>
+	   </listitem>
+	 </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqrowprocessor">
+    <term>
+     <type>PQrowProcessor</type>
+     <indexterm>
+      <primary>PQrowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the row processor callback function.
+<synopsis>
+int (*PQrowProcessor)(PGresult *res, PGrowValue *columns, void *param);
+
+typedef struct
+{
+    int         len;            /* length in bytes of the value, -1 if NULL */
+    char       *value;          /* actual value, without null termination */
+} PGrowValue;
+</synopsis>
+     </para>
+
+     <para>
+      The <parameter>columns</parameter> array will have PQnfields()
+      elements, each one pointing to column value in network buffer.
+      The <parameter>len</parameter> field will contain number of
+      bytes in value.  If the field value is NULL then
+      <parameter>len</parameter> will be -1 and value will point
+      to position where the value would have been in buffer.
+      This allows estimating row size by pointer arithmetic.
+     </para>
+
+     <para>
+       This function must process or copy row values away from network
+       buffer before it returns, as next row might overwrite them.
+     </para>
+
+     <para>
+       This function must return 1 for success, and 0 for failure.  On
+       failure this function should set the error message
+       with <function>PQresultSetErrMsg</function>.  When non-blocking
+       API is in use, it can also return 2 for early exit
+       from <function>PQisBusy</function> function.  The
+       supplied <parameter>res</parameter>
+       and <parameter>columns</parameter> values will stay valid so
+       row can be processed outside of callback.  Caller is
+       responsible for tracking whether
+       the <parameter>PQisBusy</parameter> returned early from
+       callback or for other reasons.  Usually this should happen via
+       setting cached values to NULL before
+       calling <function>PQisBusy</function>.
+     </para>
+
+     <para>
+       The function is allowed to exit via exception (setjmp/longjmp).
+       The connection and row are guaranteed to be in valid state.
+       The connection can later be closed via <function>PQfinish</function>.
+       Processing can also be continued without closing the connection,
+       call <function>getResult</function> on syncronous mode,
+       <function>PQisBusy</function> on asynchronous connection.  Then
+       processing will continue with new row, previous row that got
+       exception will be skipped. Or you can discard all remaining
+       rows by calling <function>PQskipResult</function> without
+       closing connection.
+     </para>
+
+     <variablelist>
+       <varlistentry>
+
+	 <term><parameter>res</parameter></term>
+	 <listitem>
+	   <para>
+	     A pointer to the <type>PGresult</type> object.
+	   </para>
+	 </listitem>
+       </varlistentry>
+       <varlistentry>
+
+	 <term><parameter>columns</parameter></term>
+	 <listitem>
+	   <para>
+	     Column values of the row to process.  Column values
+	     are located in network buffer, the processor must
+	     copy them out from there.
+	   </para>
+	   <para>
+	     Column values are not null-terminated, so processor cannot
+	     use C string functions on them directly.
+	   </para>
+	 </listitem>
+       </varlistentry>
+       <varlistentry>
+
+	 <term><parameter>param</parameter></term>
+	 <listitem>
+	   <para>
+	     Extra parameter that was given to <function>PQsetRowProcessor</function>.
+	   </para>
+	 </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqskipresult">
+    <term>
+     <function>PQskipResult</function>
+     <indexterm>
+      <primary>PQskipResult</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+		Discard all the remaining row data
+		after <function>PQexec</function>
+		or <function>PQgetResult</function> exits by the exception raised
+		in <type>RowProcessor</type> without closing connection.
+<synopsis>
+void PQskipResult(PGconn *conn, int skipAll)
+</synopsis>
+      </para>
+      <para>
+	<variablelist>
+	 <varlistentry>
+	   <term><parameter>conn</parameter></term>
+	   <listitem>
+	     <para>
+	       The connection object.
+	     </para>
+	   </listitem>
+	 </varlistentry>
+
+	 <varlistentry>
+	   <term><parameter>skipAll</parameter></term>
+	   <listitem>
+	     <para>
+	       Skip remaining rows in current result
+	       if <parameter>skipAll</parameter> is false(0). Skip
+	       remaining rows in current result and all rows in
+	       succeeding results if true(non-zero).
+	     </para>
+	   </listitem>
+	 </varlistentry>
+
+	</variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqresultseterrmsg">
+    <term>
+     <function>PQresultSetErrMsg</function>
+     <indexterm>
+      <primary>PQresultSetErrMsg</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+	Set the message for the error occurred
+	in <type>PQrowProcessor</type>.  If this message is not set, the
+	caller assumes the error to be `unknown' error.
+<synopsis>
+void PQresultSetErrMsg(PGresult *res, const char *msg)
+</synopsis>
+      </para>
+      <para>
+	<variablelist>
+	  <varlistentry>
+	    <term><parameter>res</parameter></term>
+	    <listitem>
+	      <para>
+		A pointer to the <type>PGresult</type> object
+		passed to <type>PQrowProcessor</type>.
+	      </para>
+	    </listitem>
+	  </varlistentry>
+	  <varlistentry>
+	    <term><parameter>msg</parameter></term>
+	    <listitem>
+	      <para>
+		Error message. This will be copied internally so there is
+		no need to care of the scope.
+	      </para>
+	      <para>
+		If <parameter>res</parameter> already has a message previously
+		set, it will be overwritten. Set NULL to cancel the the custom
+		message.
+	      </para>
+	    </listitem>
+	  </varlistentry>
+	</variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqgetrowprcessor">
+    <term>
+     <function>PQgetRowProcessor</function>
+     <indexterm>
+      <primary>PQgetRowProcessor</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+       Get row processor and its context parameter currently set to
+       the connection.
+<synopsis>
+PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param)
+</synopsis>
+      </para>
+      <para>
+	<variablelist>
+	 <varlistentry>
+	   <term><parameter>conn</parameter></term>
+	   <listitem>
+	     <para>
+	       The connection object.
+	     </para>
+	   </listitem>
+	 </varlistentry>
+
+	 <varlistentry>
+	   <term><parameter>param</parameter></term>
+	   <listitem>
+	     <para>
+              Set the current row processor parameter of the
+              connection here if not NULL.
+	     </para>
+	   </listitem>
+	 </varlistentry>
+
+	</variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+ </sect1>
+
+
  <sect1 id="libpq-build">
   <title>Building <application>libpq</application> Programs</title>
 
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..9ce1466 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn
 	bool		newXactForCursor;		/* Opened a transaction for a cursor */
 } remoteConn;
 
+typedef struct storeInfo
+{
+	Tuplestorestate *tuplestore;
+	int nattrs;
+	MemoryContext oldcontext;
+	AttInMetadata *attinmeta;
+	char** valbuf;
+	int *valbuflen;
+	char **cstrs;
+	bool error_occurred;
+	bool nummismatch;
+} storeInfo;
+
 /*
  * Internal declarations
  */
 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);
 static void validate_pkattnums(Relation rel,
 				   int2vector *pkattnums_arg, int32 pknumatts_arg,
 				   int **pkattnums, int *pknumatts);
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, PGrowValue *columns, void *param);
+
 
 /* Global */
 static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	char	   *curname = NULL;
 	int			howmany = 0;
 	bool		fail = true;	/* default to backward compatible */
+	storeInfo   storeinfo;
 
 	DBLINK_INIT;
 
@@ -559,15 +576,51 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
 
 	/*
+	 * Result is stored into storeinfo.tuplestore instead of
+	 * res->result retuned by PQexec below
+	 */
+	initStoreInfo(&storeinfo, fcinfo);
+	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+	/*
 	 * Try to execute the query.  Note that since libpq uses malloc, the
 	 * PGresult will be long-lived even though we are still in a short-lived
 	 * memory context.
 	 */
-	res = PQexec(conn, buf.data);
+	PG_TRY();
+	{
+		res = PQexec(conn, buf.data);
+	}
+	PG_CATCH();
+	{
+		ErrorData *edata;
+
+		finishStoreInfo(&storeinfo);
+		edata = CopyErrorData();
+		FlushErrorState();
+
+		/* Skip remaining results when storeHandler raises exception. */
+		PQskipResult(conn, FALSE);
+		ReThrowError(edata);
+	}
+	PG_END_TRY();
+
+	finishStoreInfo(&storeinfo);
+
 	if (!res ||
 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
 		 PQresultStatus(res) != PGRES_TUPLES_OK))
 	{
+		/* finishStoreInfo saves the fields referred to below. */
+		if (storeinfo.nummismatch)
+		{
+			/* This is only for backward compatibility */
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query result rowtype does not match "
+							"the specified FROM clause rowtype")));
+		}
+
 		dblink_res_error(conname, res, "could not fetch from cursor", fail);
 		return (Datum) 0;
 	}
@@ -579,8 +632,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
 				(errcode(ERRCODE_INVALID_CURSOR_NAME),
 				 errmsg("cursor \"%s\" does not exist", curname)));
 	}
+	PQclear(res);
 
-	materializeResult(fcinfo, res);
 	return (Datum) 0;
 }
 
@@ -640,6 +693,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible */
 	bool		freeconn = false;
+	storeInfo   storeinfo;
 
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -715,164 +769,259 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	rsinfo->setResult = NULL;
 	rsinfo->setDesc = NULL;
 
+
+	/*
+	 * Result is stored into storeinfo.tuplestore instead of
+	 * res->result retuned by PQexec/PQgetResult below
+	 */
+	initStoreInfo(&storeinfo, fcinfo);
+	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
 	/* synchronous query, or async result retrieval */
-	if (!is_async)
-		res = PQexec(conn, sql);
-	else
+	PG_TRY();
 	{
-		res = PQgetResult(conn);
-		/* NULL means we're all done with the async results */
-		if (!res)
-			return (Datum) 0;
+		if (!is_async)
+			res = PQexec(conn, sql);
+		else
+			res = PQgetResult(conn);
 	}
+	PG_CATCH();
+	{
+		ErrorData *edata;
 
-	/* if needed, close the connection to the database and cleanup */
-	if (freeconn)
-		PQfinish(conn);
+		finishStoreInfo(&storeinfo);
+		edata = CopyErrorData();
+		FlushErrorState();
 
-	if (!res ||
-		(PQresultStatus(res) != PGRES_COMMAND_OK &&
-		 PQresultStatus(res) != PGRES_TUPLES_OK))
+		/* Skip remaining results when storeHandler raises exception. */
+		PQskipResult(conn, FALSE);
+		ReThrowError(edata);
+	}
+	PG_END_TRY();
+
+	finishStoreInfo(&storeinfo);
+
+	/* NULL res from async get means we're all done with the results */
+	if (res || !is_async)
 	{
-		dblink_res_error(conname, res, "could not execute query", fail);
-		return (Datum) 0;
+		if (freeconn)
+			PQfinish(conn);
+
+		if (!res ||
+			(PQresultStatus(res) != PGRES_COMMAND_OK &&
+			 PQresultStatus(res) != PGRES_TUPLES_OK))
+		{
+			/* finishStoreInfo saves the fields referred to below. */
+			if (storeinfo.nummismatch)
+			{
+				/* This is only for backward compatibility */
+				ereport(ERROR,
+						(errcode(ERRCODE_DATATYPE_MISMATCH),
+						 errmsg("remote query result rowtype does not match "
+								"the specified FROM clause rowtype")));
+			}
+
+			dblink_res_error(conname, res, "could not execute query", fail);
+			return (Datum) 0;
+		}
 	}
+	PQclear(res);
 
-	materializeResult(fcinfo, res);
 	return (Datum) 0;
 }
 
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */
 static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
 {
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	int i;
+
+	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+	{
+		case TYPEFUNC_COMPOSITE:
+			/* success */
+			break;
+		case TYPEFUNC_RECORD:
+			/* failed to determine actual type of RECORD */
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record")));
+			break;
+		default:
+			/* result type isn't composite */
+			elog(ERROR, "return type must be a row type");
+			break;
+	}
 
-	Assert(rsinfo->returnMode == SFRM_Materialize);
+	sinfo->oldcontext = MemoryContextSwitchTo(
+		rsinfo->econtext->ecxt_per_query_memory);
 
-	PG_TRY();
+	/* make sure we have a persistent copy of the tupdesc */
+	tupdesc = CreateTupleDescCopy(tupdesc);
+
+	sinfo->error_occurred = FALSE;
+	sinfo->nummismatch = FALSE;
+	sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+	sinfo->nattrs = tupdesc->natts;
+	sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+	sinfo->valbuf = NULL;
+	sinfo->valbuflen = NULL;
+
+	/* Preallocate memory of same size with c string array for values. */
+	sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
+	if (sinfo->valbuf)
+		sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+	if (sinfo->valbuflen)
+		sinfo->cstrs = (char **)malloc(sinfo->nattrs * sizeof(char*));
+
+	if (sinfo->cstrs == NULL)
 	{
-		TupleDesc	tupdesc;
-		bool		is_sql_cmd = false;
-		int			ntuples;
-		int			nfields;
+		if (sinfo->valbuf)
+			free(sinfo->valbuf);
+		if (sinfo->valbuflen)
+			free(sinfo->valbuflen);
 
-		if (PQresultStatus(res) == PGRES_COMMAND_OK)
-		{
-			is_sql_cmd = true;
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory")));
+	}
 
-			/*
-			 * need a tuple descriptor representing one TEXT column to return
-			 * the command status string as our result tuple
-			 */
-			tupdesc = CreateTemplateTupleDesc(1, false);
-			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-							   TEXTOID, -1, 0);
-			ntuples = 1;
-			nfields = 1;
-		}
-		else
-		{
-			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+	for (i = 0 ; i < sinfo->nattrs ; i++)
+	{
+		sinfo->valbuf[i] = NULL;
+		sinfo->valbuflen[i] = -1;
+	}
 
-			is_sql_cmd = false;
+	rsinfo->setResult = sinfo->tuplestore;
+	rsinfo->setDesc = tupdesc;
+}
 
-			/* get a tuple descriptor for our result type */
-			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-			{
-				case TYPEFUNC_COMPOSITE:
-					/* success */
-					break;
-				case TYPEFUNC_RECORD:
-					/* failed to determine actual type of RECORD */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						errmsg("function returning record called in context "
-							   "that cannot accept type record")));
-					break;
-				default:
-					/* result type isn't composite */
-					elog(ERROR, "return type must be a row type");
-					break;
-			}
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+	int i;
 
-			/* make sure we have a persistent copy of the tupdesc */
-			tupdesc = CreateTupleDescCopy(tupdesc);
-			ntuples = PQntuples(res);
-			nfields = PQnfields(res);
+	if (sinfo->valbuf)
+	{
+		for (i = 0 ; i < sinfo->nattrs ; i++)
+		{
+			if (sinfo->valbuf[i])
+				free(sinfo->valbuf[i]);
 		}
+		free(sinfo->valbuf);
+		sinfo->valbuf = NULL;
+	}
 
-		/*
-		 * check result and tuple descriptor have the same number of columns
-		 */
-		if (nfields != tupdesc->natts)
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("remote query result rowtype does not match "
-							"the specified FROM clause rowtype")));
+	if (sinfo->valbuflen)
+	{
+		free(sinfo->valbuflen);
+		sinfo->valbuflen = NULL;
+	}
 
-		if (ntuples > 0)
-		{
-			AttInMetadata *attinmeta;
-			Tuplestorestate *tupstore;
-			MemoryContext oldcontext;
-			int			row;
-			char	  **values;
-
-			attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-			oldcontext = MemoryContextSwitchTo(
-									rsinfo->econtext->ecxt_per_query_memory);
-			tupstore = tuplestore_begin_heap(true, false, work_mem);
-			rsinfo->setResult = tupstore;
-			rsinfo->setDesc = tupdesc;
-			MemoryContextSwitchTo(oldcontext);
+	if (sinfo->cstrs)
+	{
+		free(sinfo->cstrs);
+		sinfo->cstrs = NULL;
+	}
 
-			values = (char **) palloc(nfields * sizeof(char *));
+	MemoryContextSwitchTo(sinfo->oldcontext);
+}
 
-			/* put all tuples into the tuplestore */
-			for (row = 0; row < ntuples; row++)
-			{
-				HeapTuple	tuple;
+static int
+storeHandler(PGresult *res, PGrowValue *columns, void *param)
+{
+	storeInfo *sinfo = (storeInfo *)param;
+	HeapTuple  tuple;
+	int        fields = PQnfields(res);
+	int        i;
+	char      **cstrs = sinfo->cstrs;
 
-				if (!is_sql_cmd)
-				{
-					int			i;
+	if (sinfo->error_occurred)
+	{
+		PQresultSetErrMsg(res, "storeHandler is called after error\n");
+		return FALSE;
+	}
 
-					for (i = 0; i < nfields; i++)
-					{
-						if (PQgetisnull(res, row, i))
-							values[i] = NULL;
-						else
-							values[i] = PQgetvalue(res, row, i);
-					}
-				}
+	if (sinfo->nattrs != fields)
+	{
+		sinfo->error_occurred = TRUE;
+		sinfo->nummismatch = TRUE;
+		finishStoreInfo(sinfo);
+
+		/* This error will be processed in
+		 * dblink_record_internal(). So do not set error message
+		 * here. */
+		
+		PQresultSetErrMsg(res, "unexpected field count in \"D\" message\n");
+		return FALSE;
+	}
+
+	/*
+	 * value input functions assumes that the input string is
+	 * terminated by zero. We should make the values to be so.
+	 */
+	for(i = 0 ; i < fields ; i++)
+	{
+		int len = columns[i].len;
+		if (len < 0)
+			cstrs[i] = NULL;
+		else
+		{
+			char *tmp = sinfo->valbuf[i];
+			int tmplen = sinfo->valbuflen[i];
+
+			/*
+			 * Divide calls to malloc and realloc so that things will
+			 * go fine even on the systems of which realloc() does not
+			 * accept NULL as old memory block.
+			 *
+			 * Also try to (re)allocate in bigger steps to
+			 * avoid flood of allocations on weird data.
+			 */
+			if (tmp == NULL)
+			{
+				tmplen = len + 1;
+				if (tmplen < 64)
+					tmplen = 64;
+				tmp = (char *)malloc(tmplen);
+			}
+			else if (tmplen < len + 1)
+			{
+				if (len + 1 > tmplen * 2)
+					tmplen = len + 1;
 				else
-				{
-					values[0] = PQcmdStatus(res);
-				}
+					tmplen = tmplen * 2;
+				tmp = (char *)realloc(tmp, tmplen);
+			}
 
-				/* build the tuple and put it into the tuplestore. */
-				tuple = BuildTupleFromCStrings(attinmeta, values);
-				tuplestore_puttuple(tupstore, tuple);
+			/*
+			 * sinfo->valbuf[n] will be freed in finishStoreInfo()
+			 * when realloc returns NULL.
+			 */
+			if (tmp == NULL)
+			{
+				PQresultSetErrMsg(res, "out of memory for query result\n");						return FALSE;
 			}
 
-			/* clean up and return the tuplestore */
-			tuplestore_donestoring(tupstore);
-		}
+			sinfo->valbuf[i] = tmp;
+			sinfo->valbuflen[i] = tmplen;
 
-		PQclear(res);
-	}
-	PG_CATCH();
-	{
-		/* be sure to release the libpq result */
-		PQclear(res);
-		PG_RE_THROW();
+			cstrs[i] = sinfo->valbuf[i];
+			memcpy(cstrs[i], columns[i].value, len);
+			cstrs[i][len] = '\0';
+		}
 	}
-	PG_END_TRY();
+
+	/*
+	 * These functions may throw exception. It will be caught in
+	 * dblink_record_internal()
+	 */
+	tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+	tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+	return TRUE;
 }
 
 /*
diff --git b/doc/src/sgml/libpq.sgml a/doc/src/sgml/libpq.sgml
index 2deb432..1f91f98 100644
--- b/doc/src/sgml/libpq.sgml
+++ a/doc/src/sgml/libpq.sgml
@@ -7350,7 +7350,17 @@ typedef struct
      <para>
        This function must return 1 for success, and 0 for failure.  On
        failure this function should set the error message
-       with <function>PQresultSetErrMsg</function>.
+       with <function>PQresultSetErrMsg</function>.  When non-blocking
+       API is in use, it can also return 2 for early exit
+       from <function>PQisBusy</function> function.  The
+       supplied <parameter>res</parameter>
+       and <parameter>columns</parameter> values will stay valid so
+       row can be processed outside of callback.  Caller is
+       responsible for tracking whether
+       the <parameter>PQisBusy</parameter> returned early from
+       callback or for other reasons.  Usually this should happen via
+       setting cached values to NULL before
+       calling <function>PQisBusy</function>.
      </para>
 
      <para>
diff --git b/src/interfaces/libpq/fe-protocol2.c a/src/interfaces/libpq/fe-protocol2.c
index da36bfa..3b0520d 100644
--- b/src/interfaces/libpq/fe-protocol2.c
+++ a/src/interfaces/libpq/fe-protocol2.c
@@ -827,6 +827,9 @@ getAnotherTuple(PGconn *conn, bool binary)
 	rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
 	if (rp == 1)
 		return 0;
+	else if (rp == 2 && pqIsnonblocking(conn))
+		/* processor requested early exit */
+		return EOF;
 	else if (rp == 0)
 	{
 		errmsg = result->errMsg;
diff --git b/src/interfaces/libpq/fe-protocol3.c a/src/interfaces/libpq/fe-protocol3.c
index 12fef30..c8202c2 100644
--- b/src/interfaces/libpq/fe-protocol3.c
+++ a/src/interfaces/libpq/fe-protocol3.c
@@ -703,6 +703,11 @@ getAnotherTuple(PGconn *conn, int msgLength)
 		/* everything is good */
 		return 0;
 	}
+	if (rp == 2 && pqIsnonblocking(conn))
+	{
+		/* processor requested early exit */
+		return EOF;
+	}
 
 	/* there was some problem */
 	if (rp == 0)
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to