On Tue, Feb 07, 2012 at 04:44:09PM +0200, Marko Kreen wrote:
> Although it seems we could allow exceptions, at least when we are
> speaking of Postgres backend, as the connection and result are
> internally consistent state when the handler is called, and the
> partial PGresult is stored under PGconn->result.  Even the
> connection is in consistent state, as the row packet is
> fully processed.
> 
> So we have 3 variants, all work, but which one do we want to support?
> 
> 1) Exceptions are not allowed.
> 2) Exceptions are allowed, but when it happens, user must call
>    PQfinish() next, to avoid processing incoming data from
>    potentially invalid state.
> 3) Exceptions are allowed, and further row processing can continue
>    with PQisBusy() / PQgetResult()
> 3.1) The problematic row is retried.  (Current behaviour)
> 3.2) The problematic row is skipped.

I converted the patch to support 3.2), that is - skip row on exception.
That required some cleanups of getAnotherTuple() API, plus I made some
other cleanups.  Details below.

But during this I also started to think what happens if the user does
*not* throw exceptions.  Eg. Python does exceptions via regular return
values, which means complications when passing them upwards.

The main case I'm thinking of is actually resultset iterator in high-level
language.  Current callback-only style API requires that rows are
stuffed into temporary buffer until the network blocks and user code
gets control again.  This feels clumsy for a performance-oriented API.
Another case is user code that wants to do complex processing.  Doing
lot of stuff under callback seems dubious.  And what if some part of
code calls PQfinish() during some error recovery?

I tried imaging some sort of getFoo() style API for fetching in-flight
row data, but I always ended up with "rewrite libpq" step, so I feel
it's not productive to go there.

Instead I added simple feature: rowProcessor can return '2',
in which case getAnotherTuple() does early exit without setting
any error state.  In user side it appears as PQisBusy() returned
with TRUE result.  All pointers stay valid, so callback can just
stuff them into some temp area.  ATM there is not indication though
whether the exit was due to callback or other reasons, so user
must detect it based on whether new temp pointers appeares,
which means those must be cleaned before calling PQisBusy() again.
This actually feels like feature, those must not stay around
after single call.

It's included in main patch, but I also attached it as separate patch
so that it can be examined separately and reverted if not acceptable.

But as it's really simple, I recommend including it.

It's usage might now be obvious though, should we include
example code in doc?



New feature:

* Row processor can return 2, then PQisBusy() does early exit.
  Supportde only when connection is in non-blocking mode.

Cleanups:

* Row data is tagged as processed when rowProcessor is called,
  so exceptions will skip the row.  This simplifies non-exceptional
  handling as well.

* Converted 'return EOF' in V3 getAnotherTuple() to set error instead.
  AFAICS those EOFs are remnants from V2 getAnotherTuple()
  not something that is coded deliberately.  Note that when
  v3 getAnotherTuple() is called the row packet is fully in buffer.
  The EOF on broken packet to signify 'give me more data' does not
  result in any useful behaviour, instead you can simply get
  into infinite loop.

Fix bugs in my previous changes:

* Split the OOM error handling from custom error message handling,
  previously the error message in PGresult was lost due to OOM logic
  early free of PGresult.

* Drop unused goto label from experimental debug code.

-- 
marko

*** a/src/interfaces/libpq/exports.txt
--- b/src/interfaces/libpq/exports.txt
***************
*** 160,162 **** PQconnectStartParams      157
--- 160,164 ----
  PQping                    158
  PQpingParams              159
  PQlibVersion              160
+ PQsetRowProcessor	  161
+ PQsetRowProcessorErrMsg	  162
*** a/src/interfaces/libpq/fe-connect.c
--- b/src/interfaces/libpq/fe-connect.c
***************
*** 2693,2698 **** makeEmptyPGconn(void)
--- 2693,2701 ----
  	conn->wait_ssl_try = false;
  #endif
  
+ 	/* set default row processor */
+ 	PQsetRowProcessor(conn, NULL, NULL);
+ 
  	/*
  	 * We try to send at least 8K at a time, which is the usual size of pipe
  	 * buffers on Unix systems.  That way, when we are sending a large amount
***************
*** 2711,2718 **** makeEmptyPGconn(void)
--- 2714,2726 ----
  	initPQExpBuffer(&conn->errorMessage);
  	initPQExpBuffer(&conn->workBuffer);
  
+ 	/* set up initial row buffer */
+ 	conn->rowBufLen = 32;
+ 	conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue));
+ 
  	if (conn->inBuffer == NULL ||
  		conn->outBuffer == NULL ||
+ 		conn->rowBuf == NULL ||
  		PQExpBufferBroken(&conn->errorMessage) ||
  		PQExpBufferBroken(&conn->workBuffer))
  	{
***************
*** 2814,2819 **** freePGconn(PGconn *conn)
--- 2822,2829 ----
  		free(conn->inBuffer);
  	if (conn->outBuffer)
  		free(conn->outBuffer);
+ 	if (conn->rowBuf)
+ 		free(conn->rowBuf);
  	termPQExpBuffer(&conn->errorMessage);
  	termPQExpBuffer(&conn->workBuffer);
  
***************
*** 5078,5080 **** PQregisterThreadLock(pgthreadlock_t newhandler)
--- 5088,5091 ----
  
  	return prev;
  }
+ 
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 66,71 **** static PGresult *PQexecFinish(PGconn *conn);
--- 66,72 ----
  static int PQsendDescribe(PGconn *conn, char desc_type,
  			   const char *desc_target);
  static int	check_field_number(const PGresult *res, int field_num);
+ static int	pqAddRow(PGresult *res, void *param, PGrowValue *columns);
  
  
  /* ----------------
***************
*** 160,165 **** PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
--- 161,167 ----
  	result->curBlock = NULL;
  	result->curOffset = 0;
  	result->spaceLeft = 0;
+ 	result->rowProcessorErrMsg = NULL;
  
  	if (conn)
  	{
***************
*** 701,707 **** pqClearAsyncResult(PGconn *conn)
  	if (conn->result)
  		PQclear(conn->result);
  	conn->result = NULL;
- 	conn->curTuple = NULL;
  }
  
  /*
--- 703,708 ----
***************
*** 756,762 **** pqPrepareAsyncResult(PGconn *conn)
  	 */
  	res = conn->result;
  	conn->result = NULL;		/* handing over ownership to caller */
- 	conn->curTuple = NULL;		/* just in case */
  	if (!res)
  		res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
  	else
--- 757,762 ----
***************
*** 828,833 **** pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
--- 828,900 ----
  }
  
  /*
+  * PQsetRowProcessor
+  *   Set function that copies column data out from network buffer.
+  */
+ void
+ PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+ {
+ 	conn->rowProcessor = (func ? func : pqAddRow);
+ 	conn->rowProcessorParam = param;
+ }
+ 
+ /*
+  * PQsetRowProcessorErrMsg
+  *    Set the error message pass back to the caller of RowProcessor.
+  *
+  *    You can replace the previous message by alternative mes, or clear
+  *    it with NULL.
+  */
+ void
+ PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+ {
+ 	if (msg)
+ 		res->rowProcessorErrMsg = pqResultStrdup(res, msg);
+ 	else
+ 		res->rowProcessorErrMsg = NULL;
+ }
+ 
+ /*
+  * pqAddRow
+  *	  add a row to the PGresult structure, growing it if necessary
+  *	  Returns TRUE if OK, FALSE if not enough memory to add the row.
+  */
+ static int
+ pqAddRow(PGresult *res, void *param, PGrowValue *columns)
+ {
+ 	PGresAttValue *tup;
+ 	int			nfields = res->numAttributes;
+ 	int			i;
+ 
+ 	tup = (PGresAttValue *)
+ 		pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+ 	if (tup == NULL)
+ 		return FALSE;
+ 
+ 	for (i = 0 ; i < nfields ; i++)
+ 	{
+ 		tup[i].len = columns[i].len;
+ 		if (tup[i].len == NULL_LEN)
+ 		{
+ 			tup[i].value = res->null_field;
+ 		}
+ 		else
+ 		{
+ 			bool isbinary = (res->attDescs[i].format != 0);
+ 			tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+ 			if (tup[i].value == NULL)
+ 				return FALSE;
+ 
+ 			memcpy(tup[i].value, columns[i].value, tup[i].len);
+ 			/* We have to terminate this ourselves */
+ 			tup[i].value[tup[i].len] = '\0';
+ 		}
+ 	}
+ 
+ 	return pqAddTuple(res, tup);
+ }
+ 
+ /*
   * pqAddTuple
   *	  add a row pointer to the PGresult structure, growing it if necessary
   *	  Returns TRUE if OK, FALSE if not enough memory to add the row
***************
*** 1223,1229 **** PQsendQueryStart(PGconn *conn)
  
  	/* initialize async result-accumulation state */
  	conn->result = NULL;
- 	conn->curTuple = NULL;
  
  	/* ready to send command message */
  	return true;
--- 1290,1295 ----
*** a/src/interfaces/libpq/fe-misc.c
--- b/src/interfaces/libpq/fe-misc.c
***************
*** 219,224 **** pqGetnchar(char *s, size_t len, PGconn *conn)
--- 219,243 ----
  }
  
  /*
+  * pqGetnchar:
+  *	skip len bytes in input buffer.
+  */
+ int
+ pqSkipnchar(size_t len, PGconn *conn)
+ {
+ 	if (len > (size_t) (conn->inEnd - conn->inCursor))
+ 		return EOF;
+ 
+ 	conn->inCursor += len;
+ 
+ 	if (conn->Pfdebug)
+ 		fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+ 				(unsigned long) len);
+ 
+ 	return 0;
+ }
+ 
+ /*
   * pqPutnchar:
   *	write exactly len bytes to the current message
   */
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
***************
*** 569,574 **** pqParseInput2(PGconn *conn)
--- 569,576 ----
  						/* Read another tuple of a normal query response */
  						if (getAnotherTuple(conn, FALSE))
  							return;
+ 						/* getAnotherTuple moves inStart itself */
+ 						continue;
  					}
  					else
  					{
***************
*** 585,590 **** pqParseInput2(PGconn *conn)
--- 587,594 ----
  						/* Read another tuple of a normal query response */
  						if (getAnotherTuple(conn, TRUE))
  							return;
+ 						/* getAnotherTuple moves inStart itself */
+ 						continue;
  					}
  					else
  					{
***************
*** 703,754 **** failure:
  
  /*
   * parseInput subroutine to read a 'B' or 'D' (row data) message.
!  * We add another tuple to the existing PGresult structure.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  We keep a partially constructed
!  * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
   */
  static int
  getAnotherTuple(PGconn *conn, bool binary)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGresAttValue *tup;
  
  	/* the backend sends us a bitmap of which attributes are null */
  	char		std_bitmap[64]; /* used unless it doesn't fit */
  	char	   *bitmap = std_bitmap;
  	int			i;
  	size_t		nbytes;			/* the number of bytes in bitmap  */
  	char		bmap;			/* One byte of the bitmap */
  	int			bitmap_index;	/* Its index */
  	int			bitcnt;			/* number of bits examined in current byte */
  	int			vlen;			/* length of the current field value */
  
  	result->binary = binary;
  
! 	/* Allocate tuple space if first time for this data message */
! 	if (conn->curTuple == NULL)
  	{
! 		conn->curTuple = (PGresAttValue *)
! 			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
! 		if (conn->curTuple == NULL)
! 			goto outOfMemory;
! 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
! 
! 		/*
! 		 * If it's binary, fix the column format indicators.  We assume the
! 		 * backend will consistently send either B or D, not a mix.
! 		 */
! 		if (binary)
! 		{
! 			for (i = 0; i < nfields; i++)
! 				result->attDescs[i].format = 1;
! 		}
  	}
- 	tup = conn->curTuple;
  
  	/* Get the null-value bitmap */
  	nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
--- 707,757 ----
  
  /*
   * parseInput subroutine to read a 'B' or 'D' (row data) message.
!  * It fills rowbuf with column pointers and then calls row processor.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.
   */
  static int
  getAnotherTuple(PGconn *conn, bool binary)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGrowValue  *rowbuf;
  
  	/* the backend sends us a bitmap of which attributes are null */
  	char		std_bitmap[64]; /* used unless it doesn't fit */
  	char	   *bitmap = std_bitmap;
  	int			i;
+ 	int			rp;
  	size_t		nbytes;			/* the number of bytes in bitmap  */
  	char		bmap;			/* One byte of the bitmap */
  	int			bitmap_index;	/* Its index */
  	int			bitcnt;			/* number of bits examined in current byte */
  	int			vlen;			/* length of the current field value */
  
+ 	/* resize row buffer if needed */
+ 	if (nfields > conn->rowBufLen)
+ 	{
+ 		rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ 		if (!rowbuf)
+ 			goto rowProcessError;
+ 		conn->rowBuf = rowbuf;
+ 		conn->rowBufLen = nfields;
+ 	}
+ 	else
+ 	{
+ 		rowbuf = conn->rowBuf;
+ 	}
+ 
  	result->binary = binary;
  
! 	if (binary)
  	{
! 		for (i = 0; i < nfields; i++)
! 			result->attDescs[i].format = 1;
  	}
  
  	/* Get the null-value bitmap */
  	nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
***************
*** 757,763 **** getAnotherTuple(PGconn *conn, bool binary)
  	{
  		bitmap = (char *) malloc(nbytes);
  		if (!bitmap)
! 			goto outOfMemory;
  	}
  
  	if (pqGetnchar(bitmap, nbytes, conn))
--- 760,766 ----
  	{
  		bitmap = (char *) malloc(nbytes);
  		if (!bitmap)
! 			goto rowProcessError;
  	}
  
  	if (pqGetnchar(bitmap, nbytes, conn))
***************
*** 771,804 **** getAnotherTuple(PGconn *conn, bool binary)
  	for (i = 0; i < nfields; i++)
  	{
  		if (!(bmap & 0200))
! 		{
! 			/* if the field value is absent, make it a null string */
! 			tup[i].value = result->null_field;
! 			tup[i].len = NULL_LEN;
! 		}
  		else
  		{
- 			/* get the value length (the first four bytes are for length) */
- 			if (pqGetInt(&vlen, 4, conn))
- 				goto EOFexit;
  			if (!binary)
  				vlen = vlen - 4;
  			if (vlen < 0)
  				vlen = 0;
- 			if (tup[i].value == NULL)
- 			{
- 				tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
- 				if (tup[i].value == NULL)
- 					goto outOfMemory;
- 			}
- 			tup[i].len = vlen;
- 			/* read in the value */
- 			if (vlen > 0)
- 				if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- 					goto EOFexit;
- 			/* we have to terminate this ourselves */
- 			tup[i].value[vlen] = '\0';
  		}
  		/* advance the bitmap stuff */
  		bitcnt++;
  		if (bitcnt == BITS_PER_BYTE)
--- 774,802 ----
  	for (i = 0; i < nfields; i++)
  	{
  		if (!(bmap & 0200))
! 			vlen = NULL_LEN;
! 		else if (pqGetInt(&vlen, 4, conn))
! 				goto EOFexit;
  		else
  		{
  			if (!binary)
  				vlen = vlen - 4;
  			if (vlen < 0)
  				vlen = 0;
  		}
+ 
+ 		/*
+ 		 * rowbuf[i].value always points to the next address of the
+ 		 * length field even if the value is NULL, to allow safe
+ 		 * size estimates and data copy.
+ 		 */
+ 		rowbuf[i].value = conn->inBuffer + conn->inCursor;
+ 		rowbuf[i].len = vlen;
+ 
+ 		/* Skip the value */
+ 		if (vlen > 0 && pqSkipnchar(vlen, conn))
+ 			goto EOFexit;
+ 
  		/* advance the bitmap stuff */
  		bitcnt++;
  		if (bitcnt == BITS_PER_BYTE)
***************
*** 811,843 **** getAnotherTuple(PGconn *conn, bool binary)
  			bmap <<= 1;
  	}
  
- 	/* Success!  Store the completed tuple in the result */
- 	if (!pqAddTuple(result, tup))
- 		goto outOfMemory;
- 	/* and reset for a new message */
- 	conn->curTuple = NULL;
- 
  	if (bitmap != std_bitmap)
  		free(bitmap);
! 	return 0;
  
- outOfMemory:
  	/* Replace partially constructed result with an error result */
  
! 	/*
! 	 * we do NOT use pqSaveErrorResult() here, because of the likelihood that
! 	 * there's not enough memory to concatenate messages...
! 	 */
! 	pqClearAsyncResult(conn);
! 	printfPQExpBuffer(&conn->errorMessage,
! 					  libpq_gettext("out of memory for query result\n"));
  
! 	/*
! 	 * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
! 	 * do to recover...
! 	 */
! 	conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
  	conn->asyncStatus = PGASYNC_READY;
  	/* Discard the failed message --- good idea? */
  	conn->inStart = conn->inEnd;
  
--- 809,864 ----
  			bmap <<= 1;
  	}
  
  	if (bitmap != std_bitmap)
  		free(bitmap);
! 	bitmap = NULL;
! 
! 	/* tag the row as parsed */
! 	conn->inStart = conn->inCursor;
! 
! 	/* Pass the completed row values to rowProcessor */
! 	rp= conn->rowProcessor(result, conn->rowProcessorParam, rowbuf);
! 	if (rp == 1)
! 		return 0;
! 	else if (rp == 2 && pqIsnonblocking(conn))
! 		/* processor requested early exit */
! 		return EOF;
! 	else if (rp != 0)
! 		PQsetRowProcessorErrMsg(result, libpq_gettext("invalid return value from row processor\n"));
! 
! rowProcessError:
  
  	/* Replace partially constructed result with an error result */
  
! 	if (result->rowProcessorErrMsg)
! 	{
! 		printfPQExpBuffer(&conn->errorMessage, "%s", result->rowProcessorErrMsg);
! 		pqSaveErrorResult(conn);
! 	}
! 	else
! 	{
! 		/*
! 		 * we do NOT use pqSaveErrorResult() here, because of the likelihood that
! 		 * there's not enough memory to concatenate messages...
! 		 */
! 		pqClearAsyncResult(conn);
! 		resetPQExpBuffer(&conn->errorMessage);
  
! 		/*
! 		 * If error message is passed from RowProcessor, set it into
! 		 * PGconn, assume out of memory if not.
! 		 */
! 		appendPQExpBufferStr(&conn->errorMessage,
! 							 libpq_gettext("out of memory for query result\n"));
! 
! 		/*
! 		 * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
! 		 * do to recover...
! 		 */
! 		conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
! 	}
  	conn->asyncStatus = PGASYNC_READY;
+ 
  	/* Discard the failed message --- good idea? */
  	conn->inStart = conn->inEnd;
  
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
***************
*** 327,332 **** pqParseInput3(PGconn *conn)
--- 327,335 ----
  						/* Read another tuple of a normal query response */
  						if (getAnotherTuple(conn, msgLength))
  							return;
+ 
+ 						/* getAnotherTuple() moves inStart itself */
+ 						continue;
  					}
  					else if (conn->result != NULL &&
  							 conn->result->resultStatus == PGRES_FATAL_ERROR)
***************
*** 613,645 **** failure:
  
  /*
   * parseInput subroutine to read a 'D' (row data) message.
!  * We add another tuple to the existing PGresult structure.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  We keep a partially constructed
!  * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
   */
  static int
  getAnotherTuple(PGconn *conn, int msgLength)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGresAttValue *tup;
  	int			tupnfields;		/* # fields from tuple */
  	int			vlen;			/* length of the current field value */
  	int			i;
! 
! 	/* Allocate tuple space if first time for this data message */
! 	if (conn->curTuple == NULL)
! 	{
! 		conn->curTuple = (PGresAttValue *)
! 			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
! 		if (conn->curTuple == NULL)
! 			goto outOfMemory;
! 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
! 	}
! 	tup = conn->curTuple;
  
  	/* Get the field count and make sure it's what we expect */
  	if (pqGetInt(&tupnfields, 2, conn))
--- 616,637 ----
  
  /*
   * parseInput subroutine to read a 'D' (row data) message.
!  * It fills rowbuf with column pointers and then calls row processor.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.
   */
  static int
  getAnotherTuple(PGconn *conn, int msgLength)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGrowValue  *rowbuf;
  	int			tupnfields;		/* # fields from tuple */
  	int			vlen;			/* length of the current field value */
  	int			i;
! 	int			rp;
  
  	/* Get the field count and make sure it's what we expect */
  	if (pqGetInt(&tupnfields, 2, conn))
***************
*** 652,703 **** getAnotherTuple(PGconn *conn, int msgLength)
  				 libpq_gettext("unexpected field count in \"D\" message\n"));
  		pqSaveErrorResult(conn);
  		/* Discard the failed message by pretending we read it */
! 		conn->inCursor = conn->inStart + 5 + msgLength;
  		return 0;
  	}
  
  	/* Scan the fields */
  	for (i = 0; i < nfields; i++)
  	{
  		/* get the value length */
  		if (pqGetInt(&vlen, 4, conn))
! 			return EOF;
  		if (vlen == -1)
! 		{
! 			/* null field */
! 			tup[i].value = result->null_field;
! 			tup[i].len = NULL_LEN;
! 			continue;
! 		}
! 		if (vlen < 0)
  			vlen = 0;
- 		if (tup[i].value == NULL)
- 		{
- 			bool		isbinary = (result->attDescs[i].format != 0);
  
! 			tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
! 			if (tup[i].value == NULL)
! 				goto outOfMemory;
! 		}
! 		tup[i].len = vlen;
! 		/* read in the value */
! 		if (vlen > 0)
! 			if (pqGetnchar((char *) (tup[i].value), vlen, conn))
! 				return EOF;
! 		/* we have to terminate this ourselves */
! 		tup[i].value[vlen] = '\0';
  	}
  
! 	/* Success!  Store the completed tuple in the result */
! 	if (!pqAddTuple(result, tup))
! 		goto outOfMemory;
! 	/* and reset for a new message */
! 	conn->curTuple = NULL;
  
  	return 0;
  
! outOfMemory:
  
  	/*
  	 * Replace partially constructed result with an error result. First
  	 * discard the old result to try to win back some memory.
--- 644,731 ----
  				 libpq_gettext("unexpected field count in \"D\" message\n"));
  		pqSaveErrorResult(conn);
  		/* Discard the failed message by pretending we read it */
! 		conn->inStart += 5 + msgLength;
  		return 0;
  	}
  
+ 	/* resize row buffer if needed */
+ 	rowbuf = conn->rowBuf;
+ 	if (nfields > conn->rowBufLen)
+ 	{
+ 		rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ 		if (!rowbuf)
+ 		{
+ 			goto outOfMemory1;
+ 		}
+ 		conn->rowBuf = rowbuf;
+ 		conn->rowBufLen = nfields;
+ 	}
+ 
  	/* Scan the fields */
  	for (i = 0; i < nfields; i++)
  	{
  		/* get the value length */
  		if (pqGetInt(&vlen, 4, conn))
! 			goto protocolError;
  		if (vlen == -1)
! 			vlen = NULL_LEN;
! 		else if (vlen < 0)
  			vlen = 0;
  
! 		/*
! 		 * rowbuf[i].value always points to the next address of the
! 		 * length field even if the value is NULL, to allow safe
! 		 * size estimates and data copy.
! 		 */
! 		rowbuf[i].value = conn->inBuffer + conn->inCursor;
! 		rowbuf[i].len = vlen;
! 
! 		/* Skip to the next length field */
! 		if (vlen > 0 && pqSkipnchar(vlen, conn))
! 			goto protocolError;
  	}
  
! 	/* tag the row as parsed, check if correctly */
! 	conn->inStart += 5 + msgLength;
! 	if (conn->inCursor != conn->inStart)
! 		goto protocolError;
  
+ 	/* Pass the completed row values to rowProcessor */
+ 	rp = conn->rowProcessor(result, conn->rowProcessorParam, rowbuf);
+ 	if (rp == 1)
+ 	{
+ 		/* everything is good */
+ 		return 0;
+ 	}
+ 	if (rp == 2 && pqIsnonblocking(conn))
+ 	{
+ 		/* processor requested early exit */
+ 		return EOF;
+ 	}
+ 
+ 	/* there was some problem */
+ 	if (rp == 0)
+ 	{
+ 		if (result->rowProcessorErrMsg == NULL)
+ 			goto outOfMemory2;
+ 
+ 		/* use supplied error message */
+ 		printfPQExpBuffer(&conn->errorMessage, "%s", result->rowProcessorErrMsg);
+ 	}
+ 	else
+ 	{
+ 		/* row processor messed up */
+ 		printfPQExpBuffer(&conn->errorMessage,
+ 						  libpq_gettext("invalid return value from row processor\n"));
+ 	}
+ 	pqSaveErrorResult(conn);
  	return 0;
  
! outOfMemory1:
! 	/* Discard the failed message by pretending we read it */
! 	conn->inStart += 5 + msgLength;
  
+ outOfMemory2:
  	/*
  	 * Replace partially constructed result with an error result. First
  	 * discard the old result to try to win back some memory.
***************
*** 706,714 **** outOfMemory:
  	printfPQExpBuffer(&conn->errorMessage,
  					  libpq_gettext("out of memory for query result\n"));
  	pqSaveErrorResult(conn);
  
  	/* Discard the failed message by pretending we read it */
! 	conn->inCursor = conn->inStart + 5 + msgLength;
  	return 0;
  }
  
--- 734,747 ----
  	printfPQExpBuffer(&conn->errorMessage,
  					  libpq_gettext("out of memory for query result\n"));
  	pqSaveErrorResult(conn);
+ 	return 0;
  
+ protocolError:
+ 	printfPQExpBuffer(&conn->errorMessage,
+ 					  libpq_gettext("invalid row contents\n"));
+ 	pqSaveErrorResult(conn);
  	/* Discard the failed message by pretending we read it */
! 	conn->inStart += 5 + msgLength;
  	return 0;
  }
  
*** a/src/interfaces/libpq/libpq-fe.h
--- b/src/interfaces/libpq/libpq-fe.h
***************
*** 149,154 **** typedef struct pgNotify
--- 149,165 ----
  	struct pgNotify *next;		/* list link */
  } PGnotify;
  
+ /* PGrowValue points a column value of in network buffer.
+  * Value is a string without null termination and length len.
+  * NULL is represented as len < 0, value points then to place
+  * where value would have been.
+  */
+ typedef struct pgRowValue
+ {
+ 	int			len;			/* length in bytes of the value */
+ 	char	   *value;			/* actual value, without null termination */
+ } PGrowValue;
+ 
  /* Function types for notice-handling callbacks */
  typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
  typedef void (*PQnoticeProcessor) (void *arg, const char *message);
***************
*** 416,421 **** extern PGPing PQping(const char *conninfo);
--- 427,463 ----
  extern PGPing PQpingParams(const char *const * keywords,
  			 const char *const * values, int expand_dbname);
  
+ /*
+  * Typedef for alternative row processor.
+  *
+  * Columns array will contain PQnfields() entries, each one
+  * pointing to particular column data in network buffer.
+  * This function is supposed to copy data out from there
+  * and store somewhere.  NULL is signified with len<0.
+  *
+  * This function must return 1 for success and must return 0 for
+  * failure and may set error message by PQsetRowProcessorErrMsg.  It
+  * is assumed by caller as out of memory when the error message is not
+  * set on failure. This function is assumed not to throw any
+  * exception.
+  */
+ typedef int (*PQrowProcessor)(PGresult *res, void *param,
+ 								PGrowValue *columns);
+ 
+ /*
+  * Set alternative row data processor for PGconn.
+  *
+  * By registering this function, pg_result disables its own result
+  * store and calls it for rows one by one.
+  *
+  * func is row processor function. See the typedef RowProcessor.
+  *
+  * rowProcessorParam is the contextual variable that passed to
+  * RowProcessor.
+  */
+ extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func,
+ 								   void *rowProcessorParam);
+ 
  /* Force the write buffer to be written (or at least try) */
  extern int	PQflush(PGconn *conn);
  
***************
*** 454,459 **** extern char *PQcmdTuples(PGresult *res);
--- 496,502 ----
  extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
  extern int	PQgetlength(const PGresult *res, int tup_num, int field_num);
  extern int	PQgetisnull(const PGresult *res, int tup_num, int field_num);
+ extern void	PQsetRowProcessorErrMsg(PGresult *res, char *msg);
  extern int	PQnparams(const PGresult *res);
  extern Oid	PQparamtype(const PGresult *res, int param_num);
  
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
***************
*** 209,214 **** struct pg_result
--- 209,217 ----
  	PGresult_data *curBlock;	/* most recently allocated block */
  	int			curOffset;		/* start offset of free space in block */
  	int			spaceLeft;		/* number of free bytes remaining in block */
+ 
+ 	/* temp etorage for message from row processor callback */
+ 	char	   *rowProcessorErrMsg;
  };
  
  /* PGAsyncStatusType defines the state of the query-execution state machine */
***************
*** 398,404 **** struct pg_conn
  
  	/* Status for asynchronous result construction */
  	PGresult   *result;			/* result being constructed */
- 	PGresAttValue *curTuple;	/* tuple currently being read */
  
  #ifdef USE_SSL
  	bool		allow_ssl_try;	/* Allowed to try SSL negotiation */
--- 401,406 ----
***************
*** 443,448 **** struct pg_conn
--- 445,458 ----
  
  	/* Buffer for receiving various parts of messages */
  	PQExpBufferData workBuffer; /* expansible string */
+ 
+ 	/*
+ 	 * Read column data from network buffer.
+ 	 */
+ 	PQrowProcessor rowProcessor;/* Function pointer */
+ 	void *rowProcessorParam;	/* Contextual parameter for rowProcessor */
+ 	PGrowValue *rowBuf;			/* Buffer for passing values to rowProcessor */
+ 	int rowBufLen;				/* Number of columns allocated in rowBuf */
  };
  
  /* PGcancel stores all data necessary to cancel a connection. A copy of this
***************
*** 560,565 **** extern int	pqGets(PQExpBuffer buf, PGconn *conn);
--- 570,576 ----
  extern int	pqGets_append(PQExpBuffer buf, PGconn *conn);
  extern int	pqPuts(const char *s, PGconn *conn);
  extern int	pqGetnchar(char *s, size_t len, PGconn *conn);
+ extern int	pqSkipnchar(size_t len, PGconn *conn);
  extern int	pqPutnchar(const char *s, size_t len, PGconn *conn);
  extern int	pqGetInt(int *result, size_t bytes, PGconn *conn);
  extern int	pqPutInt(int value, size_t bytes, PGconn *conn);
*** a/doc/src/sgml/libpq.sgml
--- b/doc/src/sgml/libpq.sgml
***************
*** 7233,7238 **** int PQisthreadsafe();
--- 7233,7467 ----
   </sect1>
  
  
+  <sect1 id="libpq-altrowprocessor">
+   <title>Alternative row processor</title>
+ 
+   <indexterm zone="libpq-altrowprocessor">
+    <primary>PGresult</primary>
+    <secondary>PGconn</secondary>
+   </indexterm>
+ 
+   <para>
+    As the standard usage, rows are stored into <type>PQresult</type>
+    until full resultset is received.  Then such completely-filled
+    <type>PQresult</type> is passed to user.  This behaviour can be
+    changed by registering alternative row processor function,
+    that will see each row data as soon as it is received
+    from network.  It has the option of processing the data
+    immediately, or storing it into custom container.
+   </para>
+ 
+   <para>
+    Note - as row processor sees rows as they arrive, it cannot know
+    whether the SQL statement actually finishes successfully on server
+    or not.  So some care must be taken to get proper
+    transactionality.
+   </para>
+ 
+   <variablelist>
+    <varlistentry id="libpq-pqsetrowprocessor">
+     <term>
+      <function>PQsetRowProcessor</function>
+      <indexterm>
+       <primary>PQsetRowProcessor</primary>
+      </indexterm>
+     </term>
+ 
+     <listitem>
+      <para>
+        Sets a callback function to process each row.
+ <synopsis>
+ void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+ </synopsis>
+      </para>
+      
+      <para>
+        <variablelist>
+ 	 <varlistentry>
+ 	   <term><parameter>conn</parameter></term>
+ 	   <listitem>
+ 	     <para>
+ 	       The connection object to set the row processor function.
+ 	     </para>
+ 	   </listitem>
+ 	 </varlistentry>
+ 	 <varlistentry>
+ 	   <term><parameter>func</parameter></term>
+ 	   <listitem>
+ 	     <para>
+ 	       Storage handler function to set. NULL means to use the
+ 	       default processor.
+ 	     </para>
+ 	   </listitem>
+ 	 </varlistentry>
+ 	 <varlistentry>
+ 	   <term><parameter>param</parameter></term>
+ 	   <listitem>
+ 	     <para>
+ 	       A pointer to contextual parameter passed
+ 	       to <parameter>func</parameter>.
+ 	     </para>
+ 	   </listitem>
+ 	 </varlistentry>
+        </variablelist>
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+ 
+   <variablelist>
+    <varlistentry id="libpq-pqrowprocessor">
+     <term>
+      <type>PQrowProcessor</type>
+      <indexterm>
+       <primary>PQrowProcessor</primary>
+      </indexterm>
+     </term>
+ 
+     <listitem>
+      <para>
+        The type for the row processor callback function.
+ <synopsis>
+ int (*PQrowProcessor)(PGresult *res, void *param, PGrowValue *columns);
+ 
+ typedef struct
+ {
+     int         len;            /* length in bytes of the value, -1 if NULL */
+     char       *value;          /* actual value, without null termination */
+ } PGrowValue;
+ </synopsis>
+      </para>
+ 
+      <para>
+       The <parameter>columns</parameter> array will have PQnfields()
+       elements, each one pointing to column value in network buffer.
+       The <parameter>len</parameter> field will contain number of
+       bytes in value.  If the field value is NULL then
+       <parameter>len</parameter> will be -1 and value will point
+       to position where the value would have been in buffer.
+       This allows estimating row size by pointer arithmetic.
+      </para>
+ 
+      <para>
+        This function must process or copy row values away from network
+        buffer before it returns, as next row might overwrite them.
+      </para>
+ 
+      <para>
+        This function must return 1 for success, and 0 for failure.
+        On failure this function should set the error message
+        with <function>PGsetRowProcessorErrMsg</function> if the cause
+        is other than out of memory.
+        When non-blocking API is in use, it can also return 2
+        for early exit from <function>PQisBusy</function> function.
+        The supplied <parameter>res</parameter> and <parameter>columns</parameter>
+        values will stay valid so row can be processed outside of callback.
+        Caller is resposible for tracking whether the <parameter>PQisBusy</parameter>
+        returned early from callback or for other reasons.
+        Usually this should happen via setting cached values to NULL
+        before calling <function>PQisBusy</function>.
+      </para>
+ 
+      <para>
+        The function is allowed to exit via exception (setjmp/longjmp).
+        The connection and row are guaranteed to be in valid state.
+        The connection can later be closed via <function>PQfinish</function>.
+        Processing can also be continued without closing the connection,
+        call <function>getResult</function> on syncronous mode,
+        <function>PQisBusy</function> on asynchronous connection.
+        Then processing will continue with new row, previous row
+        that got exception will be skipped.
+      </para>
+ 
+      <variablelist>
+        <varlistentry>
+ 
+ 	 <term><parameter>res</parameter></term>
+ 	 <listitem>
+ 	   <para>
+ 	     A pointer to the <type>PGresult</type> object.
+ 	   </para>
+ 	 </listitem>
+        </varlistentry>
+        <varlistentry>
+ 
+ 	 <term><parameter>param</parameter></term>
+ 	 <listitem>
+ 	   <para>
+ 	     Extra parameter that was given to <function>PQsetRowProcessor</function>.
+ 	   </para>
+ 	 </listitem>
+        </varlistentry>
+        <varlistentry>
+ 
+ 	 <term><parameter>columns</parameter></term>
+ 	 <listitem>
+ 	   <para>
+ 	     Column values of the row to process.  Column values
+ 	     are located in network buffer, the processor must
+ 	     copy them out from there.
+ 	   </para>
+ 	   <para>
+ 	     Column values are not null-terminated, so processor cannot
+ 	     use C string functions on them directly.
+ 	   </para>
+ 	 </listitem>
+        </varlistentry>
+      </variablelist>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+ 
+   <variablelist>
+    <varlistentry id="libpq-pqsetrowprocessorerrmsg">
+     <term>
+      <function>PQsetRowProcessorErrMsg</function>
+      <indexterm>
+       <primary>PQsetRowProcessorErrMsg</primary>
+      </indexterm>
+     </term>
+     <listitem>
+       <para>
+ 	Set the message for the error occurred
+ 	in <type>PQrowProcessor</type>.  If this message is not set, the
+ 	caller assumes the error to be out of memory.
+ <synopsis>
+ void PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+ </synopsis>
+       </para>
+       <para>
+ 	<variablelist>
+ 	  <varlistentry>
+ 	    <term><parameter>res</parameter></term>
+ 	    <listitem>
+ 	      <para>
+ 		A pointer to the <type>PGresult</type> object
+ 		passed to <type>PQrowProcessor</type>.
+ 	      </para>
+ 	    </listitem>
+ 	  </varlistentry>
+ 	  <varlistentry>
+ 	    <term><parameter>msg</parameter></term>
+ 	    <listitem>
+ 	      <para>
+ 		Error message. This will be copied internally so there is
+ 		no need to care of the scope.
+ 	      </para>
+ 	      <para>
+ 		If <parameter>res</parameter> already has a message previously
+ 		set, it will be overwritten. Set NULL to cancel the the custom
+ 		message.
+ 	      </para>
+ 	    </listitem>
+ 	  </varlistentry>
+ 	</variablelist>
+       </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect1>
+ 
+ 
   <sect1 id="libpq-build">
    <title>Building <application>libpq</application> Programs</title>
  
*** a/contrib/dblink/dblink.c
--- b/contrib/dblink/dblink.c
***************
*** 63,73 **** typedef struct remoteConn
  	bool		newXactForCursor;		/* Opened a transaction for a cursor */
  } remoteConn;
  
  /*
   * Internal declarations
   */
  static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
- static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn *rconn);
--- 63,86 ----
  	bool		newXactForCursor;		/* Opened a transaction for a cursor */
  } remoteConn;
  
+ typedef struct storeInfo
+ {
+ 	Tuplestorestate *tuplestore;
+ 	int nattrs;
+ 	MemoryContext oldcontext;
+ 	AttInMetadata *attinmeta;
+ 	char** valbuf;
+ 	int *valbuflen;
+ 	char **cstrs;
+ 	bool error_occurred;
+ 	bool nummismatch;
+ 	ErrorData *edata;
+ } storeInfo;
+ 
  /*
   * Internal declarations
   */
  static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn *rconn);
***************
*** 90,95 **** static char *escape_param_str(const char *from);
--- 103,112 ----
  static void validate_pkattnums(Relation rel,
  				   int2vector *pkattnums_arg, int32 pknumatts_arg,
  				   int **pkattnums, int *pknumatts);
+ static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+ static void finishStoreInfo(storeInfo *sinfo);
+ static int storeHandler(PGresult *res, void *param, PGrowValue *columns);
+ 
  
  /* Global */
  static remoteConn *pconn = NULL;
***************
*** 503,508 **** dblink_fetch(PG_FUNCTION_ARGS)
--- 520,526 ----
  	char	   *curname = NULL;
  	int			howmany = 0;
  	bool		fail = true;	/* default to backward compatible */
+ 	storeInfo   storeinfo;
  
  	DBLINK_INIT;
  
***************
*** 559,573 **** dblink_fetch(PG_FUNCTION_ARGS)
--- 577,612 ----
  	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
  
  	/*
+ 	 * Result is stored into storeinfo.tuplestore instead of
+ 	 * res->result retuned by PQexec below
+ 	 */
+ 	initStoreInfo(&storeinfo, fcinfo);
+ 	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+ 
+ 	/*
  	 * Try to execute the query.  Note that since libpq uses malloc, the
  	 * PGresult will be long-lived even though we are still in a short-lived
  	 * memory context.
  	 */
  	res = PQexec(conn, buf.data);
+ 	finishStoreInfo(&storeinfo);
+ 
  	if (!res ||
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
  	{
+ 		/* finishStoreInfo saves the fields referred to below. */
+ 		if (storeinfo.nummismatch)
+ 		{
+ 			/* This is only for backward compatibility */
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_DATATYPE_MISMATCH),
+ 					 errmsg("remote query result rowtype does not match "
+ 							"the specified FROM clause rowtype")));
+ 		}
+ 		else if (storeinfo.edata)
+ 			ReThrowError(storeinfo.edata);
+ 
  		dblink_res_error(conname, res, "could not fetch from cursor", fail);
  		return (Datum) 0;
  	}
***************
*** 579,586 **** dblink_fetch(PG_FUNCTION_ARGS)
  				(errcode(ERRCODE_INVALID_CURSOR_NAME),
  				 errmsg("cursor \"%s\" does not exist", curname)));
  	}
  
- 	materializeResult(fcinfo, res);
  	return (Datum) 0;
  }
  
--- 618,625 ----
  				(errcode(ERRCODE_INVALID_CURSOR_NAME),
  				 errmsg("cursor \"%s\" does not exist", curname)));
  	}
+ 	PQclear(res);
  
  	return (Datum) 0;
  }
  
***************
*** 640,645 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
--- 679,685 ----
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible */
  	bool		freeconn = false;
+ 	storeInfo   storeinfo;
  
  	/* check to see if caller supports us returning a tuplestore */
  	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
***************
*** 715,878 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  	rsinfo->setResult = NULL;
  	rsinfo->setDesc = NULL;
  
  	/* synchronous query, or async result retrieval */
  	if (!is_async)
  		res = PQexec(conn, sql);
  	else
- 	{
  		res = PQgetResult(conn);
- 		/* NULL means we're all done with the async results */
- 		if (!res)
- 			return (Datum) 0;
- 	}
  
! 	/* if needed, close the connection to the database and cleanup */
! 	if (freeconn)
! 		PQfinish(conn);
  
! 	if (!res ||
! 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
! 		 PQresultStatus(res) != PGRES_TUPLES_OK))
  	{
! 		dblink_res_error(conname, res, "could not execute query", fail);
! 		return (Datum) 0;
  	}
  
- 	materializeResult(fcinfo, res);
  	return (Datum) 0;
  }
  
- /*
-  * Materialize the PGresult to return them as the function result.
-  * The res will be released in this function.
-  */
  static void
! materializeResult(FunctionCallInfo fcinfo, PGresult *res)
  {
  	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
  
! 	Assert(rsinfo->returnMode == SFRM_Materialize);
  
! 	PG_TRY();
  	{
! 		TupleDesc	tupdesc;
! 		bool		is_sql_cmd = false;
! 		int			ntuples;
! 		int			nfields;
  
! 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
! 		{
! 			is_sql_cmd = true;
  
! 			/*
! 			 * need a tuple descriptor representing one TEXT column to return
! 			 * the command status string as our result tuple
! 			 */
! 			tupdesc = CreateTemplateTupleDesc(1, false);
! 			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
! 							   TEXTOID, -1, 0);
! 			ntuples = 1;
! 			nfields = 1;
! 		}
! 		else
! 		{
! 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
  
! 			is_sql_cmd = false;
  
! 			/* get a tuple descriptor for our result type */
! 			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! 			{
! 				case TYPEFUNC_COMPOSITE:
! 					/* success */
! 					break;
! 				case TYPEFUNC_RECORD:
! 					/* failed to determine actual type of RECORD */
! 					ereport(ERROR,
! 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 						errmsg("function returning record called in context "
! 							   "that cannot accept type record")));
! 					break;
! 				default:
! 					/* result type isn't composite */
! 					elog(ERROR, "return type must be a row type");
! 					break;
! 			}
  
! 			/* make sure we have a persistent copy of the tupdesc */
! 			tupdesc = CreateTupleDescCopy(tupdesc);
! 			ntuples = PQntuples(res);
! 			nfields = PQnfields(res);
  		}
  
! 		/*
! 		 * check result and tuple descriptor have the same number of columns
! 		 */
! 		if (nfields != tupdesc->natts)
! 			ereport(ERROR,
! 					(errcode(ERRCODE_DATATYPE_MISMATCH),
! 					 errmsg("remote query result rowtype does not match "
! 							"the specified FROM clause rowtype")));
  
! 		if (ntuples > 0)
! 		{
! 			AttInMetadata *attinmeta;
! 			Tuplestorestate *tupstore;
! 			MemoryContext oldcontext;
! 			int			row;
! 			char	  **values;
! 
! 			attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 
! 			oldcontext = MemoryContextSwitchTo(
! 									rsinfo->econtext->ecxt_per_query_memory);
! 			tupstore = tuplestore_begin_heap(true, false, work_mem);
! 			rsinfo->setResult = tupstore;
! 			rsinfo->setDesc = tupdesc;
! 			MemoryContextSwitchTo(oldcontext);
  
! 			values = (char **) palloc(nfields * sizeof(char *));
  
! 			/* put all tuples into the tuplestore */
! 			for (row = 0; row < ntuples; row++)
! 			{
! 				HeapTuple	tuple;
  
! 				if (!is_sql_cmd)
! 				{
! 					int			i;
  
! 					for (i = 0; i < nfields; i++)
! 					{
! 						if (PQgetisnull(res, row, i))
! 							values[i] = NULL;
! 						else
! 							values[i] = PQgetvalue(res, row, i);
! 					}
! 				}
! 				else
! 				{
! 					values[0] = PQcmdStatus(res);
! 				}
  
! 				/* build the tuple and put it into the tuplestore. */
! 				tuple = BuildTupleFromCStrings(attinmeta, values);
! 				tuplestore_puttuple(tupstore, tuple);
  			}
  
! 			/* clean up and return the tuplestore */
! 			tuplestore_donestoring(tupstore);
  		}
  
! 		PQclear(res);
  	}
  	PG_CATCH();
  	{
! 		/* be sure to release the libpq result */
! 		PQclear(res);
! 		PG_RE_THROW();
  	}
  	PG_END_TRY();
  }
  
  /*
--- 755,1006 ----
  	rsinfo->setResult = NULL;
  	rsinfo->setDesc = NULL;
  
+ 
+ 	/*
+ 	 * Result is stored into storeinfo.tuplestore instead of
+ 	 * res->result retuned by PQexec/PQgetResult below
+ 	 */
+ 	initStoreInfo(&storeinfo, fcinfo);
+ 	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+ 
  	/* synchronous query, or async result retrieval */
  	if (!is_async)
  		res = PQexec(conn, sql);
  	else
  		res = PQgetResult(conn);
  
! 	finishStoreInfo(&storeinfo);
  
! 	/* NULL res from async get means we're all done with the results */
! 	if (res || !is_async)
  	{
! 		if (freeconn)
! 			PQfinish(conn);
! 
! 		if (!res ||
! 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
! 			 PQresultStatus(res) != PGRES_TUPLES_OK))
! 		{
! 			/* finishStoreInfo saves the fields referred to below. */
! 			if (storeinfo.nummismatch)
! 			{
! 				/* This is only for backward compatibility */
! 				ereport(ERROR,
! 						(errcode(ERRCODE_DATATYPE_MISMATCH),
! 						 errmsg("remote query result rowtype does not match "
! 								"the specified FROM clause rowtype")));
! 			}
! 			else if (storeinfo.edata)
! 				ReThrowError(storeinfo.edata);
! 
! 			dblink_res_error(conname, res, "could not execute query", fail);
! 			return (Datum) 0;
! 		}
  	}
+ 	PQclear(res);
  
  	return (Datum) 0;
  }
  
  static void
! initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
  {
  	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ 	TupleDesc	tupdesc;
+ 	int i;
  
! 	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! 	{
! 		case TYPEFUNC_COMPOSITE:
! 			/* success */
! 			break;
! 		case TYPEFUNC_RECORD:
! 			/* failed to determine actual type of RECORD */
! 			ereport(ERROR,
! 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 					 errmsg("function returning record called in context "
! 							"that cannot accept type record")));
! 			break;
! 		default:
! 			/* result type isn't composite */
! 			elog(ERROR, "return type must be a row type");
! 			break;
! 	}
  
! 	sinfo->oldcontext = MemoryContextSwitchTo(
! 		rsinfo->econtext->ecxt_per_query_memory);
! 
! 	/* make sure we have a persistent copy of the tupdesc */
! 	tupdesc = CreateTupleDescCopy(tupdesc);
! 
! 	sinfo->error_occurred = FALSE;
! 	sinfo->nummismatch = FALSE;
! 	sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 	sinfo->edata = NULL;
! 	sinfo->nattrs = tupdesc->natts;
! 	sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
! 	sinfo->valbuf = NULL;
! 	sinfo->valbuflen = NULL;
! 
! 	/* Preallocate memory of same size with c string array for values. */
! 	sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
! 	if (sinfo->valbuf)
! 		sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
! 	if (sinfo->valbuflen)
! 		sinfo->cstrs = (char **)malloc(sinfo->nattrs * sizeof(char*));
! 
! 	if (sinfo->cstrs == NULL)
  	{
! 		if (sinfo->valbuf)
! 			free(sinfo->valbuf);
! 		if (sinfo->valbuflen)
! 			free(sinfo->valbuflen);
  
! 		ereport(ERROR,
! 				(errcode(ERRCODE_OUT_OF_MEMORY),
! 				 errmsg("out of memory")));
! 	}
  
! 	for (i = 0 ; i < sinfo->nattrs ; i++)
! 	{
! 		sinfo->valbuf[i] = NULL;
! 		sinfo->valbuflen[i] = -1;
! 	}
  
! 	rsinfo->setResult = sinfo->tuplestore;
! 	rsinfo->setDesc = tupdesc;
! }
  
! static void
! finishStoreInfo(storeInfo *sinfo)
! {
! 	int i;
  
! 	if (sinfo->valbuf)
! 	{
! 		for (i = 0 ; i < sinfo->nattrs ; i++)
! 		{
! 			if (sinfo->valbuf[i])
! 				free(sinfo->valbuf[i]);
  		}
+ 		free(sinfo->valbuf);
+ 		sinfo->valbuf = NULL;
+ 	}
  
! 	if (sinfo->valbuflen)
! 	{
! 		free(sinfo->valbuflen);
! 		sinfo->valbuflen = NULL;
! 	}
  
! 	if (sinfo->cstrs)
! 	{
! 		free(sinfo->cstrs);
! 		sinfo->cstrs = NULL;
! 	}
  
! 	MemoryContextSwitchTo(sinfo->oldcontext);
! }
  
! static int
! storeHandler(PGresult *res, void *param, PGrowValue *columns)
! {
! 	storeInfo *sinfo = (storeInfo *)param;
! 	HeapTuple  tuple;
! 	int        fields = PQnfields(res);
! 	int        i;
! 	char      **cstrs = sinfo->cstrs;
  
! 	if (sinfo->error_occurred)
! 		return FALSE;
  
! 	if (sinfo->nattrs != fields)
! 	{
! 		sinfo->error_occurred = TRUE;
! 		sinfo->nummismatch = TRUE;
! 		finishStoreInfo(sinfo);
! 
! 		/* This error will be processed in
! 		 * dblink_record_internal(). So do not set error message
! 		 * here. */
! 		return FALSE;
! 	}
  
! 	/*
! 	 * value input functions assumes that the input string is
! 	 * terminated by zero. We should make the values to be so.
! 	 */
! 	for(i = 0 ; i < fields ; i++)
! 	{
! 		int len = columns[i].len;
! 		if (len < 0)
! 			cstrs[i] = NULL;
! 		else
! 		{
! 			char *tmp = sinfo->valbuf[i];
! 			int tmplen = sinfo->valbuflen[i];
! 
! 			/*
! 			 * Divide calls to malloc and realloc so that things will
! 			 * go fine even on the systems of which realloc() does not
! 			 * accept NULL as old memory block.
! 			 *
! 			 * Also try to (re)allocate in bigger steps to
! 			 * avoid flood of allocations on weird data.
! 			 */
! 			if (tmp == NULL)
! 			{
! 				tmplen = len + 1;
! 				if (tmplen < 64)
! 					tmplen = 64;
! 				tmp = (char *)malloc(tmplen);
! 			}
! 			else if (tmplen < len + 1)
! 			{
! 				if (len + 1 > tmplen * 2)
! 					tmplen = len + 1;
! 				else
! 					tmplen = tmplen * 2;
! 				tmp = (char *)realloc(tmp, tmplen);
  			}
  
! 			/*
! 			 * sinfo->valbuf[n] will be freed in finishStoreInfo()
! 			 * when realloc returns NULL.
! 			 */
! 			if (tmp == NULL)
! 				return FALSE;
! 
! 			sinfo->valbuf[i] = tmp;
! 			sinfo->valbuflen[i] = tmplen;
! 
! 			cstrs[i] = sinfo->valbuf[i];
! 			memcpy(cstrs[i], columns[i].value, len);
! 			cstrs[i][len] = '\0';
  		}
+ 	}
  
! 	PG_TRY();
! 	{
! 		tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
! 		tuplestore_puttuple(sinfo->tuplestore, tuple);
  	}
  	PG_CATCH();
  	{
! 		MemoryContext context;
! 		/*
! 		 * Store exception for later ReThrow and cancel the exception.
! 		 */
! 		sinfo->error_occurred = TRUE;
! 		context = MemoryContextSwitchTo(sinfo->oldcontext);
! 		sinfo->edata = CopyErrorData();
! 		MemoryContextSwitchTo(context);
! 		FlushErrorState();
! 		return FALSE;
  	}
  	PG_END_TRY();
+ 
+ 	return TRUE;
  }
  
  /*
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 2dc18e6..6829d52 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7352,6 +7352,14 @@ typedef struct
        On failure this function should set the error message
        with <function>PGsetRowProcessorErrMsg</function> if the cause
        is other than out of memory.
+       When non-blocking API is in use, it can also return 2
+       for early exit from <function>PQisBusy</function> function.
+       The supplied <parameter>res</parameter> and <parameter>columns</parameter>
+       values will stay valid so row can be processed outside of callback.
+       Caller is resposible for tracking whether the <parameter>PQisBusy</parameter>
+       returned early from callback or for other reasons.
+       Usually this should happen via setting cached values to NULL
+       before calling <function>PQisBusy</function>.
      </para>
 
      <para>
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 7498580..ae4d7b0 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -820,6 +820,9 @@ getAnotherTuple(PGconn *conn, bool binary)
 	rp= conn->rowProcessor(result, conn->rowProcessorParam, rowbuf);
 	if (rp == 1)
 		return 0;
+	else if (rp == 2 && pqIsnonblocking(conn))
+		/* processor requested early exit */
+		return EOF;
 	else if (rp != 0)
 		PQsetRowProcessorErrMsg(result, libpq_gettext("invalid return value from row processor\n"));
 
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index a67e3ac..0260ba6 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -697,6 +697,11 @@ getAnotherTuple(PGconn *conn, int msgLength)
 		/* everything is good */
 		return 0;
 	}
+	if (rp == 2 && pqIsnonblocking(conn))
+	{
+		/* processor requested early exit */
+		return EOF;
+	}
 
 	/* there was some problem */
 	if (rp == 0)
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to