On Tue, Feb 07, 2012 at 07:25:14PM +0900, Shigeru Hanada wrote:
> (2012/02/02 23:30), Marko Kreen wrote:
> > On Thu, Feb 02, 2012 at 04:51:37PM +0900, Kyotaro HORIGUCHI wrote:
> >> Hello, This is new version of dblink.c
> >>
> >> - Memory is properly freed when realloc returns NULL in storeHandler().
> >>
> >> - The bug that free() in finishStoreInfo() will be fed with
> >>    garbage pointer when malloc for sinfo->valbuflen fails is
> >>    fixed.
> > 
> > Thanks, merged.  I also did some minor coding style cleanups.
> > 
> > Tagging it Ready For Committer.  I don't see any notable
> > issues with the patch anymore.
> > 
> > There is some potential for experimenting with more aggressive
> > optimizations on dblink side, but I'd like to get a nod from
> > a committer for libpq changes first.
> 
> I tried to use this feature in pgsql_fdw patch, and found some small issues.
> 
> - Typos
>     - mes -> msg
>     - funcion -> function
>     - overritten -> overwritten
>     - costom -> custom

Fixed.

> - What is the right (or recommended) way to prevent from throwing
> exceptoin in row-processor callback function?  When author should use
> PG_TRY block to catch exception in the callback function?

When it calls backend functions that can throw exceptions?
As all handlers running in backend will want to convert data
to Datums, that means "always wrap handler code in PG_TRY"?

Although it seems we could allow exceptions, at least when we are
speaking of Postgres backend, as the connection and result are
internally consistent state when the handler is called, and the
partial PGresult is stored under PGconn->result.  Even the
connection is in consistent state, as the row packet is
fully processed.

So we have 3 variants, all work, but which one do we want to support?

1) Exceptions are not allowed.
2) Exceptions are allowed, but when it happens, user must call
   PQfinish() next, to avoid processing incoming data from
   potentially invalid state.
3) Exceptions are allowed, and further row processing can continue
   with PQisBusy() / PQgetResult()
3.1) The problematic row is retried.  (Current behaviour)
3.2) The problematic row is skipped.

No clue if that is ok for handler written in C++, I have no idea
whether you can throw C++ exception when part of the stack
contains raw C calls.

> - It would be better to describe how to determine whether a column
> result is NULL should be described clearly.  Perhaps result value is
> NULL when PGrowValue.len is less than 0, right?

Eh, seems it's documented everywhere except in sgml doc.  Fixed.
[ Is it better to document that it is "-1" or "< 0"? ]

Also I removed one remaining dynamic stack array in dblink.c

Current state of patch attached.

-- 
marko

*** a/src/interfaces/libpq/exports.txt
--- b/src/interfaces/libpq/exports.txt
***************
*** 160,162 **** PQconnectStartParams      157
--- 160,164 ----
  PQping                    158
  PQpingParams              159
  PQlibVersion              160
+ PQsetRowProcessor	  161
+ PQsetRowProcessorErrMsg	  162
*** a/src/interfaces/libpq/fe-connect.c
--- b/src/interfaces/libpq/fe-connect.c
***************
*** 2693,2698 **** makeEmptyPGconn(void)
--- 2693,2701 ----
  	conn->wait_ssl_try = false;
  #endif
  
+ 	/* set default row processor */
+ 	PQsetRowProcessor(conn, NULL, NULL);
+ 
  	/*
  	 * We try to send at least 8K at a time, which is the usual size of pipe
  	 * buffers on Unix systems.  That way, when we are sending a large amount
***************
*** 2711,2718 **** makeEmptyPGconn(void)
--- 2714,2726 ----
  	initPQExpBuffer(&conn->errorMessage);
  	initPQExpBuffer(&conn->workBuffer);
  
+ 	/* set up initial row buffer */
+ 	conn->rowBufLen = 32;
+ 	conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue));
+ 
  	if (conn->inBuffer == NULL ||
  		conn->outBuffer == NULL ||
+ 		conn->rowBuf == NULL ||
  		PQExpBufferBroken(&conn->errorMessage) ||
  		PQExpBufferBroken(&conn->workBuffer))
  	{
***************
*** 2814,2819 **** freePGconn(PGconn *conn)
--- 2822,2829 ----
  		free(conn->inBuffer);
  	if (conn->outBuffer)
  		free(conn->outBuffer);
+ 	if (conn->rowBuf)
+ 		free(conn->rowBuf);
  	termPQExpBuffer(&conn->errorMessage);
  	termPQExpBuffer(&conn->workBuffer);
  
***************
*** 5078,5080 **** PQregisterThreadLock(pgthreadlock_t newhandler)
--- 5088,5091 ----
  
  	return prev;
  }
+ 
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 66,71 **** static PGresult *PQexecFinish(PGconn *conn);
--- 66,72 ----
  static int PQsendDescribe(PGconn *conn, char desc_type,
  			   const char *desc_target);
  static int	check_field_number(const PGresult *res, int field_num);
+ static int	pqAddRow(PGresult *res, void *param, PGrowValue *columns);
  
  
  /* ----------------
***************
*** 160,165 **** PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
--- 161,167 ----
  	result->curBlock = NULL;
  	result->curOffset = 0;
  	result->spaceLeft = 0;
+ 	result->rowProcessorErrMsg = NULL;
  
  	if (conn)
  	{
***************
*** 701,707 **** pqClearAsyncResult(PGconn *conn)
  	if (conn->result)
  		PQclear(conn->result);
  	conn->result = NULL;
- 	conn->curTuple = NULL;
  }
  
  /*
--- 703,708 ----
***************
*** 756,762 **** pqPrepareAsyncResult(PGconn *conn)
  	 */
  	res = conn->result;
  	conn->result = NULL;		/* handing over ownership to caller */
- 	conn->curTuple = NULL;		/* just in case */
  	if (!res)
  		res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
  	else
--- 757,762 ----
***************
*** 828,833 **** pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
--- 828,900 ----
  }
  
  /*
+  * PQsetRowProcessor
+  *   Set function that copies column data out from network buffer.
+  */
+ void
+ PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+ {
+ 	conn->rowProcessor = (func ? func : pqAddRow);
+ 	conn->rowProcessorParam = param;
+ }
+ 
+ /*
+  * PQsetRowProcessorErrMsg
+  *    Set the error message pass back to the caller of RowProcessor.
+  *
+  *    You can replace the previous message by alternative mes, or clear
+  *    it with NULL.
+  */
+ void
+ PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+ {
+ 	if (msg)
+ 		res->rowProcessorErrMsg = pqResultStrdup(res, msg);
+ 	else
+ 		res->rowProcessorErrMsg = NULL;
+ }
+ 
+ /*
+  * pqAddRow
+  *	  add a row to the PGresult structure, growing it if necessary
+  *	  Returns TRUE if OK, FALSE if not enough memory to add the row.
+  */
+ static int
+ pqAddRow(PGresult *res, void *param, PGrowValue *columns)
+ {
+ 	PGresAttValue *tup;
+ 	int			nfields = res->numAttributes;
+ 	int			i;
+ 
+ 	tup = (PGresAttValue *)
+ 		pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+ 	if (tup == NULL)
+ 		return FALSE;
+ 
+ 	for (i = 0 ; i < nfields ; i++)
+ 	{
+ 		tup[i].len = columns[i].len;
+ 		if (tup[i].len == NULL_LEN)
+ 		{
+ 			tup[i].value = res->null_field;
+ 		}
+ 		else
+ 		{
+ 			bool isbinary = (res->attDescs[i].format != 0);
+ 			tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+ 			if (tup[i].value == NULL)
+ 				return FALSE;
+ 
+ 			memcpy(tup[i].value, columns[i].value, tup[i].len);
+ 			/* We have to terminate this ourselves */
+ 			tup[i].value[tup[i].len] = '\0';
+ 		}
+ 	}
+ 
+ 	return pqAddTuple(res, tup);
+ }
+ 
+ /*
   * pqAddTuple
   *	  add a row pointer to the PGresult structure, growing it if necessary
   *	  Returns TRUE if OK, FALSE if not enough memory to add the row
***************
*** 1223,1229 **** PQsendQueryStart(PGconn *conn)
  
  	/* initialize async result-accumulation state */
  	conn->result = NULL;
- 	conn->curTuple = NULL;
  
  	/* ready to send command message */
  	return true;
--- 1290,1295 ----
*** a/src/interfaces/libpq/fe-misc.c
--- b/src/interfaces/libpq/fe-misc.c
***************
*** 219,224 **** pqGetnchar(char *s, size_t len, PGconn *conn)
--- 219,243 ----
  }
  
  /*
+  * pqGetnchar:
+  *	skip len bytes in input buffer.
+  */
+ int
+ pqSkipnchar(size_t len, PGconn *conn)
+ {
+ 	if (len > (size_t) (conn->inEnd - conn->inCursor))
+ 		return EOF;
+ 
+ 	conn->inCursor += len;
+ 
+ 	if (conn->Pfdebug)
+ 		fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+ 				(unsigned long) len);
+ 
+ 	return 0;
+ }
+ 
+ /*
   * pqPutnchar:
   *	write exactly len bytes to the current message
   */
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
***************
*** 703,721 **** failure:
  
  /*
   * parseInput subroutine to read a 'B' or 'D' (row data) message.
!  * We add another tuple to the existing PGresult structure.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  We keep a partially constructed
!  * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
   */
  static int
  getAnotherTuple(PGconn *conn, bool binary)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGresAttValue *tup;
  
  	/* the backend sends us a bitmap of which attributes are null */
  	char		std_bitmap[64]; /* used unless it doesn't fit */
--- 703,720 ----
  
  /*
   * parseInput subroutine to read a 'B' or 'D' (row data) message.
!  * It fills rowbuf with column pointers and then calls row processor.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.
   */
  static int
  getAnotherTuple(PGconn *conn, bool binary)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGrowValue  *rowbuf;
  
  	/* the backend sends us a bitmap of which attributes are null */
  	char		std_bitmap[64]; /* used unless it doesn't fit */
***************
*** 727,754 **** getAnotherTuple(PGconn *conn, bool binary)
  	int			bitcnt;			/* number of bits examined in current byte */
  	int			vlen;			/* length of the current field value */
  
  	result->binary = binary;
  
! 	/* Allocate tuple space if first time for this data message */
! 	if (conn->curTuple == NULL)
  	{
! 		conn->curTuple = (PGresAttValue *)
! 			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
! 		if (conn->curTuple == NULL)
! 			goto outOfMemory;
! 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
! 
! 		/*
! 		 * If it's binary, fix the column format indicators.  We assume the
! 		 * backend will consistently send either B or D, not a mix.
! 		 */
! 		if (binary)
! 		{
! 			for (i = 0; i < nfields; i++)
! 				result->attDescs[i].format = 1;
! 		}
  	}
- 	tup = conn->curTuple;
  
  	/* Get the null-value bitmap */
  	nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
--- 726,752 ----
  	int			bitcnt;			/* number of bits examined in current byte */
  	int			vlen;			/* length of the current field value */
  
+ 	/* resize row buffer if needed */
+ 	if (nfields > conn->rowBufLen)
+ 	{
+ 		rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ 		if (!rowbuf)
+ 			goto rowProcessError;
+ 		conn->rowBuf = rowbuf;
+ 		conn->rowBufLen = nfields;
+ 	}
+ 	else
+ 	{
+ 		rowbuf = conn->rowBuf;
+ 	}
+ 
  	result->binary = binary;
  
! 	if (binary)
  	{
! 		for (i = 0; i < nfields; i++)
! 			result->attDescs[i].format = 1;
  	}
  
  	/* Get the null-value bitmap */
  	nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
***************
*** 757,763 **** getAnotherTuple(PGconn *conn, bool binary)
  	{
  		bitmap = (char *) malloc(nbytes);
  		if (!bitmap)
! 			goto outOfMemory;
  	}
  
  	if (pqGetnchar(bitmap, nbytes, conn))
--- 755,761 ----
  	{
  		bitmap = (char *) malloc(nbytes);
  		if (!bitmap)
! 			goto rowProcessError;
  	}
  
  	if (pqGetnchar(bitmap, nbytes, conn))
***************
*** 771,804 **** getAnotherTuple(PGconn *conn, bool binary)
  	for (i = 0; i < nfields; i++)
  	{
  		if (!(bmap & 0200))
! 		{
! 			/* if the field value is absent, make it a null string */
! 			tup[i].value = result->null_field;
! 			tup[i].len = NULL_LEN;
! 		}
  		else
  		{
- 			/* get the value length (the first four bytes are for length) */
- 			if (pqGetInt(&vlen, 4, conn))
- 				goto EOFexit;
  			if (!binary)
  				vlen = vlen - 4;
  			if (vlen < 0)
  				vlen = 0;
- 			if (tup[i].value == NULL)
- 			{
- 				tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
- 				if (tup[i].value == NULL)
- 					goto outOfMemory;
- 			}
- 			tup[i].len = vlen;
- 			/* read in the value */
- 			if (vlen > 0)
- 				if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- 					goto EOFexit;
- 			/* we have to terminate this ourselves */
- 			tup[i].value[vlen] = '\0';
  		}
  		/* advance the bitmap stuff */
  		bitcnt++;
  		if (bitcnt == BITS_PER_BYTE)
--- 769,797 ----
  	for (i = 0; i < nfields; i++)
  	{
  		if (!(bmap & 0200))
! 			vlen = NULL_LEN;
! 		else if (pqGetInt(&vlen, 4, conn))
! 				goto EOFexit;
  		else
  		{
  			if (!binary)
  				vlen = vlen - 4;
  			if (vlen < 0)
  				vlen = 0;
  		}
+ 
+ 		/*
+ 		 * rowbuf[i].value always points to the next address of the
+ 		 * length field even if the value is NULL, to allow safe
+ 		 * size estimates and data copy.
+ 		 */
+ 		rowbuf[i].value = conn->inBuffer + conn->inCursor;
+ 		rowbuf[i].len = vlen;
+ 
+ 		/* Skip the value */
+ 		if (vlen > 0 && pqSkipnchar(vlen, conn))
+ 			goto EOFexit;
+ 
  		/* advance the bitmap stuff */
  		bitcnt++;
  		if (bitcnt == BITS_PER_BYTE)
***************
*** 811,827 **** getAnotherTuple(PGconn *conn, bool binary)
  			bmap <<= 1;
  	}
  
! 	/* Success!  Store the completed tuple in the result */
! 	if (!pqAddTuple(result, tup))
! 		goto outOfMemory;
! 	/* and reset for a new message */
! 	conn->curTuple = NULL;
  
  	if (bitmap != std_bitmap)
  		free(bitmap);
  	return 0;
  
! outOfMemory:
  	/* Replace partially constructed result with an error result */
  
  	/*
--- 804,820 ----
  			bmap <<= 1;
  	}
  
! 	/* Success!  Pass the completed row values to rowProcessor */
! 	if (!conn->rowProcessor(result, conn->rowProcessorParam, rowbuf))
! 		goto rowProcessError;
  
  	if (bitmap != std_bitmap)
  		free(bitmap);
+ 
  	return 0;
  
! rowProcessError:
! 
  	/* Replace partially constructed result with an error result */
  
  	/*
***************
*** 829,838 **** outOfMemory:
  	 * there's not enough memory to concatenate messages...
  	 */
  	pqClearAsyncResult(conn);
! 	printfPQExpBuffer(&conn->errorMessage,
! 					  libpq_gettext("out of memory for query result\n"));
  
  	/*
  	 * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
  	 * do to recover...
  	 */
--- 822,838 ----
  	 * there's not enough memory to concatenate messages...
  	 */
  	pqClearAsyncResult(conn);
! 	resetPQExpBuffer(&conn->errorMessage);
  
  	/*
+ 	 * If error message is passed from RowProcessor, set it into
+ 	 * PGconn, assume out of memory if not.
+ 	 */
+ 	appendPQExpBufferStr(&conn->errorMessage,
+ 						 result->rowProcessorErrMsg ?
+ 						 result->rowProcessorErrMsg :
+ 						 libpq_gettext("out of memory for query result\n"));
+ 	/*
  	 * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
  	 * do to recover...
  	 */
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
***************
*** 613,646 **** failure:
  
  /*
   * parseInput subroutine to read a 'D' (row data) message.
!  * We add another tuple to the existing PGresult structure.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  We keep a partially constructed
!  * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
   */
  static int
  getAnotherTuple(PGconn *conn, int msgLength)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGresAttValue *tup;
  	int			tupnfields;		/* # fields from tuple */
  	int			vlen;			/* length of the current field value */
  	int			i;
  
- 	/* Allocate tuple space if first time for this data message */
- 	if (conn->curTuple == NULL)
- 	{
- 		conn->curTuple = (PGresAttValue *)
- 			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- 		if (conn->curTuple == NULL)
- 			goto outOfMemory;
- 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
- 	}
- 	tup = conn->curTuple;
- 
  	/* Get the field count and make sure it's what we expect */
  	if (pqGetInt(&tupnfields, 2, conn))
  		return EOF;
--- 613,634 ----
  
  /*
   * parseInput subroutine to read a 'D' (row data) message.
!  * It fills rowbuf with column pointers and then calls row processor.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.
   */
  static int
  getAnotherTuple(PGconn *conn, int msgLength)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGrowValue  *rowbuf;
  	int			tupnfields;		/* # fields from tuple */
  	int			vlen;			/* length of the current field value */
  	int			i;
  
  	/* Get the field count and make sure it's what we expect */
  	if (pqGetInt(&tupnfields, 2, conn))
  		return EOF;
***************
*** 656,661 **** getAnotherTuple(PGconn *conn, int msgLength)
--- 644,663 ----
  		return 0;
  	}
  
+ 	/* resize row buffer if needed */
+ 	if (nfields > conn->rowBufLen)
+ 	{
+ 		rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ 		if (!rowbuf)
+ 			goto rowProcessError;
+ 		conn->rowBuf = rowbuf;
+ 		conn->rowBufLen = nfields;
+ 	}
+ 	else
+ 	{
+ 		rowbuf = conn->rowBuf;
+ 	}
+ 
  	/* Scan the fields */
  	for (i = 0; i < nfields; i++)
  	{
***************
*** 663,710 **** getAnotherTuple(PGconn *conn, int msgLength)
  		if (pqGetInt(&vlen, 4, conn))
  			return EOF;
  		if (vlen == -1)
! 		{
! 			/* null field */
! 			tup[i].value = result->null_field;
! 			tup[i].len = NULL_LEN;
! 			continue;
! 		}
! 		if (vlen < 0)
  			vlen = 0;
- 		if (tup[i].value == NULL)
- 		{
- 			bool		isbinary = (result->attDescs[i].format != 0);
  
! 			tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
! 			if (tup[i].value == NULL)
! 				goto outOfMemory;
! 		}
! 		tup[i].len = vlen;
! 		/* read in the value */
! 		if (vlen > 0)
! 			if (pqGetnchar((char *) (tup[i].value), vlen, conn))
! 				return EOF;
! 		/* we have to terminate this ourselves */
! 		tup[i].value[vlen] = '\0';
  	}
  
! 	/* Success!  Store the completed tuple in the result */
! 	if (!pqAddTuple(result, tup))
! 		goto outOfMemory;
! 	/* and reset for a new message */
! 	conn->curTuple = NULL;
  
  	return 0;
  
! outOfMemory:
  
  	/*
  	 * Replace partially constructed result with an error result. First
  	 * discard the old result to try to win back some memory.
  	 */
  	pqClearAsyncResult(conn);
! 	printfPQExpBuffer(&conn->errorMessage,
! 					  libpq_gettext("out of memory for query result\n"));
  	pqSaveErrorResult(conn);
  
  	/* Discard the failed message by pretending we read it */
--- 665,710 ----
  		if (pqGetInt(&vlen, 4, conn))
  			return EOF;
  		if (vlen == -1)
! 			vlen = NULL_LEN;
! 		else if (vlen < 0)
  			vlen = 0;
  
! 		/*
! 		 * rowbuf[i].value always points to the next address of the
! 		 * length field even if the value is NULL, to allow safe
! 		 * size estimates and data copy.
! 		 */
! 		rowbuf[i].value = conn->inBuffer + conn->inCursor;
! 		rowbuf[i].len = vlen;
! 
! 		/* Skip to the next length field */
! 		if (vlen > 0 && pqSkipnchar(vlen, conn))
! 			return EOF;
  	}
  
! 	/* Success!  Pass the completed row values to rowProcessor */
! 	if (!conn->rowProcessor(result, conn->rowProcessorParam, rowbuf))
! 		goto rowProcessError;
  
  	return 0;
  
! rowProcessError:
  
  	/*
  	 * Replace partially constructed result with an error result. First
  	 * discard the old result to try to win back some memory.
  	 */
  	pqClearAsyncResult(conn);
! 	resetPQExpBuffer(&conn->errorMessage);
! 
! 	/*
! 	 * If error message is passed from addTupleFunc, set it into
! 	 * PGconn, assume out of memory if not.
! 	 */
! 	appendPQExpBufferStr(&conn->errorMessage,
! 						 result->rowProcessorErrMsg ?
! 						 result->rowProcessorErrMsg :
! 						 libpq_gettext("out of memory for query result\n"));
  	pqSaveErrorResult(conn);
  
  	/* Discard the failed message by pretending we read it */
*** a/src/interfaces/libpq/libpq-fe.h
--- b/src/interfaces/libpq/libpq-fe.h
***************
*** 149,154 **** typedef struct pgNotify
--- 149,165 ----
  	struct pgNotify *next;		/* list link */
  } PGnotify;
  
+ /* PGrowValue points a column value of in network buffer.
+  * Value is a string without null termination and length len.
+  * NULL is represented as len < 0, value points then to place
+  * where value would have been.
+  */
+ typedef struct pgRowValue
+ {
+ 	int			len;			/* length in bytes of the value */
+ 	char	   *value;			/* actual value, without null termination */
+ } PGrowValue;
+ 
  /* Function types for notice-handling callbacks */
  typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
  typedef void (*PQnoticeProcessor) (void *arg, const char *message);
***************
*** 416,421 **** extern PGPing PQping(const char *conninfo);
--- 427,463 ----
  extern PGPing PQpingParams(const char *const * keywords,
  			 const char *const * values, int expand_dbname);
  
+ /*
+  * Typedef for alternative row processor.
+  *
+  * Columns array will contain PQnfields() entries, each one
+  * pointing to particular column data in network buffer.
+  * This function is supposed to copy data out from there
+  * and store somewhere.  NULL is signified with len<0.
+  *
+  * This function must return 1 for success and must return 0 for
+  * failure and may set error message by PQsetRowProcessorErrMsg.  It
+  * is assumed by caller as out of memory when the error message is not
+  * set on failure. This function is assumed not to throw any
+  * exception.
+  */
+ typedef int (*PQrowProcessor)(PGresult *res, void *param,
+ 								PGrowValue *columns);
+ 
+ /*
+  * Set alternative row data processor for PGconn.
+  *
+  * By registering this function, pg_result disables its own result
+  * store and calls it for rows one by one.
+  *
+  * func is row processor function. See the typedef RowProcessor.
+  *
+  * rowProcessorParam is the contextual variable that passed to
+  * RowProcessor.
+  */
+ extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func,
+ 								   void *rowProcessorParam);
+ 
  /* Force the write buffer to be written (or at least try) */
  extern int	PQflush(PGconn *conn);
  
***************
*** 454,459 **** extern char *PQcmdTuples(PGresult *res);
--- 496,502 ----
  extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
  extern int	PQgetlength(const PGresult *res, int tup_num, int field_num);
  extern int	PQgetisnull(const PGresult *res, int tup_num, int field_num);
+ extern void	PQsetRowProcessorErrMsg(PGresult *res, char *msg);
  extern int	PQnparams(const PGresult *res);
  extern Oid	PQparamtype(const PGresult *res, int param_num);
  
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
***************
*** 209,214 **** struct pg_result
--- 209,217 ----
  	PGresult_data *curBlock;	/* most recently allocated block */
  	int			curOffset;		/* start offset of free space in block */
  	int			spaceLeft;		/* number of free bytes remaining in block */
+ 
+ 	/* temp etorage for message from row processor callback */
+ 	char	   *rowProcessorErrMsg;
  };
  
  /* PGAsyncStatusType defines the state of the query-execution state machine */
***************
*** 398,404 **** struct pg_conn
  
  	/* Status for asynchronous result construction */
  	PGresult   *result;			/* result being constructed */
- 	PGresAttValue *curTuple;	/* tuple currently being read */
  
  #ifdef USE_SSL
  	bool		allow_ssl_try;	/* Allowed to try SSL negotiation */
--- 401,406 ----
***************
*** 443,448 **** struct pg_conn
--- 445,458 ----
  
  	/* Buffer for receiving various parts of messages */
  	PQExpBufferData workBuffer; /* expansible string */
+ 
+ 	/*
+ 	 * Read column data from network buffer.
+ 	 */
+ 	PQrowProcessor rowProcessor;/* Function pointer */
+ 	void *rowProcessorParam;	/* Contextual parameter for rowProcessor */
+ 	PGrowValue *rowBuf;			/* Buffer for passing values to rowProcessor */
+ 	int rowBufLen;				/* Number of columns allocated in rowBuf */
  };
  
  /* PGcancel stores all data necessary to cancel a connection. A copy of this
***************
*** 560,565 **** extern int	pqGets(PQExpBuffer buf, PGconn *conn);
--- 570,576 ----
  extern int	pqGets_append(PQExpBuffer buf, PGconn *conn);
  extern int	pqPuts(const char *s, PGconn *conn);
  extern int	pqGetnchar(char *s, size_t len, PGconn *conn);
+ extern int	pqSkipnchar(size_t len, PGconn *conn);
  extern int	pqPutnchar(const char *s, size_t len, PGconn *conn);
  extern int	pqGetInt(int *result, size_t bytes, PGconn *conn);
  extern int	pqPutInt(int value, size_t bytes, PGconn *conn);
*** a/doc/src/sgml/libpq.sgml
--- b/doc/src/sgml/libpq.sgml
***************
*** 7233,7238 **** int PQisthreadsafe();
--- 7233,7448 ----
   </sect1>
  
  
+  <sect1 id="libpq-altrowprocessor">
+   <title>Alternative row processor</title>
+ 
+   <indexterm zone="libpq-altrowprocessor">
+    <primary>PGresult</primary>
+    <secondary>PGconn</secondary>
+   </indexterm>
+ 
+   <para>
+    As the standard usage, rows are stored into <type>PQresult</type>
+    until full resultset is received.  Then such completely-filled
+    <type>PQresult</type> is passed to user.  This behaviour can be
+    changed by registering alternative row processor function,
+    that will see each row data as soon as it is received
+    from network.  It has the option of processing the data
+    immediately, or storing it into custom container.
+   </para>
+ 
+   <para>
+    Note - as row processor sees rows as they arrive, it cannot know
+    whether the SQL statement actually finishes successfully on server
+    or not.  So some care must be taken to get proper
+    transactionality.
+   </para>
+ 
+   <variablelist>
+    <varlistentry id="libpq-pqsetrowprocessor">
+     <term>
+      <function>PQsetRowProcessor</function>
+      <indexterm>
+       <primary>PQsetRowProcessor</primary>
+      </indexterm>
+     </term>
+ 
+     <listitem>
+      <para>
+        Sets a callback function to process each row.
+ <synopsis>
+ void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+ </synopsis>
+      </para>
+      
+      <para>
+        <variablelist>
+ 	 <varlistentry>
+ 	   <term><parameter>conn</parameter></term>
+ 	   <listitem>
+ 	     <para>
+ 	       The connection object to set the row processor function.
+ 	     </para>
+ 	   </listitem>
+ 	 </varlistentry>
+ 	 <varlistentry>
+ 	   <term><parameter>func</parameter></term>
+ 	   <listitem>
+ 	     <para>
+ 	       Storage handler function to set. NULL means to use the
+ 	       default processor.
+ 	     </para>
+ 	   </listitem>
+ 	 </varlistentry>
+ 	 <varlistentry>
+ 	   <term><parameter>param</parameter></term>
+ 	   <listitem>
+ 	     <para>
+ 	       A pointer to contextual parameter passed
+ 	       to <parameter>func</parameter>.
+ 	     </para>
+ 	   </listitem>
+ 	 </varlistentry>
+        </variablelist>
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+ 
+   <variablelist>
+    <varlistentry id="libpq-pqrowprocessor">
+     <term>
+      <type>PQrowProcessor</type>
+      <indexterm>
+       <primary>PQrowProcessor</primary>
+      </indexterm>
+     </term>
+ 
+     <listitem>
+      <para>
+        The type for the row processor callback function.
+ <synopsis>
+ int (*PQrowProcessor)(PGresult *res, void *param, PGrowValue *columns);
+ 
+ typedef struct
+ {
+     int         len;            /* length in bytes of the value, -1 if NULL */
+     char       *value;          /* actual value, without null termination */
+ } PGrowValue;
+ </synopsis>
+      </para>
+ 
+      <para>
+       The <parameter>columns</parameter> array will have PQnfields()
+       elements, each one pointing to column value in network buffer.
+       The <parameter>len</parameter> field will contain number of
+       bytes in value.  If the field value is NULL then
+       <parameter>len</parameter> will be -1 and value will point
+       to position where the value would have been in buffer.
+       This allows estimating row size by pointer arithmetic.
+      </para>
+ 
+      <para>
+        This function must process or copy row values away from network
+        buffer before it returns, as next row might overwrite them.
+      </para>
+ 
+      <para>
+        This function must return 1 for success, and 0 for failure.
+        On failure this function should set the error message
+        with <function>PGsetRowProcessorErrMsg</function> if the cause
+        is other than out of memory.  This function must not throw any
+        exception.
+      </para>
+      <variablelist>
+        <varlistentry>
+ 
+ 	 <term><parameter>res</parameter></term>
+ 	 <listitem>
+ 	   <para>
+ 	     A pointer to the <type>PGresult</type> object.
+ 	   </para>
+ 	 </listitem>
+        </varlistentry>
+        <varlistentry>
+ 
+ 	 <term><parameter>param</parameter></term>
+ 	 <listitem>
+ 	   <para>
+ 	     Extra parameter that was given to <function>PQsetRowProcessor</function>.
+ 	   </para>
+ 	 </listitem>
+        </varlistentry>
+        <varlistentry>
+ 
+ 	 <term><parameter>columns</parameter></term>
+ 	 <listitem>
+ 	   <para>
+ 	     Column values of the row to process.  Column values
+ 	     are located in network buffer, the processor must
+ 	     copy them out from there.
+ 	   </para>
+ 	   <para>
+ 	     Column values are not null-terminated, so processor cannot
+ 	     use C string functions on them directly.
+ 	   </para>
+ 	 </listitem>
+        </varlistentry>
+      </variablelist>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+ 
+   <variablelist>
+    <varlistentry id="libpq-pqsetrowprocessorerrmsg">
+     <term>
+      <function>PQsetRowProcessorErrMsg</function>
+      <indexterm>
+       <primary>PQsetRowProcessorErrMsg</primary>
+      </indexterm>
+     </term>
+     <listitem>
+       <para>
+ 	Set the message for the error occurred
+ 	in <type>PQrowProcessor</type>.  If this message is not set, the
+ 	caller assumes the error to be out of memory.
+ <synopsis>
+ void PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+ </synopsis>
+       </para>
+       <para>
+ 	<variablelist>
+ 	  <varlistentry>
+ 	    <term><parameter>res</parameter></term>
+ 	    <listitem>
+ 	      <para>
+ 		A pointer to the <type>PGresult</type> object
+ 		passed to <type>PQrowProcessor</type>.
+ 	      </para>
+ 	    </listitem>
+ 	  </varlistentry>
+ 	  <varlistentry>
+ 	    <term><parameter>msg</parameter></term>
+ 	    <listitem>
+ 	      <para>
+ 		Error message. This will be copied internally so there is
+ 		no need to care of the scope.
+ 	      </para>
+ 	      <para>
+ 		If <parameter>res</parameter> already has a message previously
+ 		set, it will be overwritten. Set NULL to cancel the the custom
+ 		message.
+ 	      </para>
+ 	    </listitem>
+ 	  </varlistentry>
+ 	</variablelist>
+       </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect1>
+ 
+ 
   <sect1 id="libpq-build">
    <title>Building <application>libpq</application> Programs</title>
  
*** a/contrib/dblink/dblink.c
--- b/contrib/dblink/dblink.c
***************
*** 63,73 **** typedef struct remoteConn
  	bool		newXactForCursor;		/* Opened a transaction for a cursor */
  } remoteConn;
  
  /*
   * Internal declarations
   */
  static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
- static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn *rconn);
--- 63,86 ----
  	bool		newXactForCursor;		/* Opened a transaction for a cursor */
  } remoteConn;
  
+ typedef struct storeInfo
+ {
+ 	Tuplestorestate *tuplestore;
+ 	int nattrs;
+ 	MemoryContext oldcontext;
+ 	AttInMetadata *attinmeta;
+ 	char** valbuf;
+ 	int *valbuflen;
+ 	char **cstrs;
+ 	bool error_occurred;
+ 	bool nummismatch;
+ 	ErrorData *edata;
+ } storeInfo;
+ 
  /*
   * Internal declarations
   */
  static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn *rconn);
***************
*** 90,95 **** static char *escape_param_str(const char *from);
--- 103,112 ----
  static void validate_pkattnums(Relation rel,
  				   int2vector *pkattnums_arg, int32 pknumatts_arg,
  				   int **pkattnums, int *pknumatts);
+ static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+ static void finishStoreInfo(storeInfo *sinfo);
+ static int storeHandler(PGresult *res, void *param, PGrowValue *columns);
+ 
  
  /* Global */
  static remoteConn *pconn = NULL;
***************
*** 503,508 **** dblink_fetch(PG_FUNCTION_ARGS)
--- 520,526 ----
  	char	   *curname = NULL;
  	int			howmany = 0;
  	bool		fail = true;	/* default to backward compatible */
+ 	storeInfo   storeinfo;
  
  	DBLINK_INIT;
  
***************
*** 559,573 **** dblink_fetch(PG_FUNCTION_ARGS)
--- 577,612 ----
  	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
  
  	/*
+ 	 * Result is stored into storeinfo.tuplestore instead of
+ 	 * res->result retuned by PQexec below
+ 	 */
+ 	initStoreInfo(&storeinfo, fcinfo);
+ 	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+ 
+ 	/*
  	 * Try to execute the query.  Note that since libpq uses malloc, the
  	 * PGresult will be long-lived even though we are still in a short-lived
  	 * memory context.
  	 */
  	res = PQexec(conn, buf.data);
+ 	finishStoreInfo(&storeinfo);
+ 
  	if (!res ||
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
  	{
+ 		/* finishStoreInfo saves the fields referred to below. */
+ 		if (storeinfo.nummismatch)
+ 		{
+ 			/* This is only for backward compatibility */
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_DATATYPE_MISMATCH),
+ 					 errmsg("remote query result rowtype does not match "
+ 							"the specified FROM clause rowtype")));
+ 		}
+ 		else if (storeinfo.edata)
+ 			ReThrowError(storeinfo.edata);
+ 
  		dblink_res_error(conname, res, "could not fetch from cursor", fail);
  		return (Datum) 0;
  	}
***************
*** 579,586 **** dblink_fetch(PG_FUNCTION_ARGS)
  				(errcode(ERRCODE_INVALID_CURSOR_NAME),
  				 errmsg("cursor \"%s\" does not exist", curname)));
  	}
  
- 	materializeResult(fcinfo, res);
  	return (Datum) 0;
  }
  
--- 618,625 ----
  				(errcode(ERRCODE_INVALID_CURSOR_NAME),
  				 errmsg("cursor \"%s\" does not exist", curname)));
  	}
+ 	PQclear(res);
  
  	return (Datum) 0;
  }
  
***************
*** 640,645 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
--- 679,685 ----
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible */
  	bool		freeconn = false;
+ 	storeInfo   storeinfo;
  
  	/* check to see if caller supports us returning a tuplestore */
  	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
***************
*** 715,878 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  	rsinfo->setResult = NULL;
  	rsinfo->setDesc = NULL;
  
  	/* synchronous query, or async result retrieval */
  	if (!is_async)
  		res = PQexec(conn, sql);
  	else
- 	{
  		res = PQgetResult(conn);
- 		/* NULL means we're all done with the async results */
- 		if (!res)
- 			return (Datum) 0;
- 	}
  
! 	/* if needed, close the connection to the database and cleanup */
! 	if (freeconn)
! 		PQfinish(conn);
  
! 	if (!res ||
! 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
! 		 PQresultStatus(res) != PGRES_TUPLES_OK))
  	{
! 		dblink_res_error(conname, res, "could not execute query", fail);
! 		return (Datum) 0;
  	}
  
- 	materializeResult(fcinfo, res);
  	return (Datum) 0;
  }
  
- /*
-  * Materialize the PGresult to return them as the function result.
-  * The res will be released in this function.
-  */
  static void
! materializeResult(FunctionCallInfo fcinfo, PGresult *res)
  {
  	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
  
! 	Assert(rsinfo->returnMode == SFRM_Materialize);
  
! 	PG_TRY();
  	{
! 		TupleDesc	tupdesc;
! 		bool		is_sql_cmd = false;
! 		int			ntuples;
! 		int			nfields;
  
! 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
! 		{
! 			is_sql_cmd = true;
  
! 			/*
! 			 * need a tuple descriptor representing one TEXT column to return
! 			 * the command status string as our result tuple
! 			 */
! 			tupdesc = CreateTemplateTupleDesc(1, false);
! 			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
! 							   TEXTOID, -1, 0);
! 			ntuples = 1;
! 			nfields = 1;
! 		}
! 		else
! 		{
! 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
  
! 			is_sql_cmd = false;
  
! 			/* get a tuple descriptor for our result type */
! 			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! 			{
! 				case TYPEFUNC_COMPOSITE:
! 					/* success */
! 					break;
! 				case TYPEFUNC_RECORD:
! 					/* failed to determine actual type of RECORD */
! 					ereport(ERROR,
! 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 						errmsg("function returning record called in context "
! 							   "that cannot accept type record")));
! 					break;
! 				default:
! 					/* result type isn't composite */
! 					elog(ERROR, "return type must be a row type");
! 					break;
! 			}
  
! 			/* make sure we have a persistent copy of the tupdesc */
! 			tupdesc = CreateTupleDescCopy(tupdesc);
! 			ntuples = PQntuples(res);
! 			nfields = PQnfields(res);
  		}
  
! 		/*
! 		 * check result and tuple descriptor have the same number of columns
! 		 */
! 		if (nfields != tupdesc->natts)
! 			ereport(ERROR,
! 					(errcode(ERRCODE_DATATYPE_MISMATCH),
! 					 errmsg("remote query result rowtype does not match "
! 							"the specified FROM clause rowtype")));
  
! 		if (ntuples > 0)
! 		{
! 			AttInMetadata *attinmeta;
! 			Tuplestorestate *tupstore;
! 			MemoryContext oldcontext;
! 			int			row;
! 			char	  **values;
! 
! 			attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 
! 			oldcontext = MemoryContextSwitchTo(
! 									rsinfo->econtext->ecxt_per_query_memory);
! 			tupstore = tuplestore_begin_heap(true, false, work_mem);
! 			rsinfo->setResult = tupstore;
! 			rsinfo->setDesc = tupdesc;
! 			MemoryContextSwitchTo(oldcontext);
  
! 			values = (char **) palloc(nfields * sizeof(char *));
  
! 			/* put all tuples into the tuplestore */
! 			for (row = 0; row < ntuples; row++)
! 			{
! 				HeapTuple	tuple;
  
! 				if (!is_sql_cmd)
! 				{
! 					int			i;
  
! 					for (i = 0; i < nfields; i++)
! 					{
! 						if (PQgetisnull(res, row, i))
! 							values[i] = NULL;
! 						else
! 							values[i] = PQgetvalue(res, row, i);
! 					}
! 				}
! 				else
! 				{
! 					values[0] = PQcmdStatus(res);
! 				}
  
! 				/* build the tuple and put it into the tuplestore. */
! 				tuple = BuildTupleFromCStrings(attinmeta, values);
! 				tuplestore_puttuple(tupstore, tuple);
  			}
  
! 			/* clean up and return the tuplestore */
! 			tuplestore_donestoring(tupstore);
! 		}
  
! 		PQclear(res);
  	}
  	PG_CATCH();
  	{
! 		/* be sure to release the libpq result */
! 		PQclear(res);
! 		PG_RE_THROW();
  	}
  	PG_END_TRY();
  }
  
  /*
--- 755,1006 ----
  	rsinfo->setResult = NULL;
  	rsinfo->setDesc = NULL;
  
+ 
+ 	/*
+ 	 * Result is stored into storeinfo.tuplestore instead of
+ 	 * res->result retuned by PQexec/PQgetResult below
+ 	 */
+ 	initStoreInfo(&storeinfo, fcinfo);
+ 	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+ 
  	/* synchronous query, or async result retrieval */
  	if (!is_async)
  		res = PQexec(conn, sql);
  	else
  		res = PQgetResult(conn);
  
! 	finishStoreInfo(&storeinfo);
  
! 	/* NULL res from async get means we're all done with the results */
! 	if (res || !is_async)
  	{
! 		if (freeconn)
! 			PQfinish(conn);
! 
! 		if (!res ||
! 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
! 			 PQresultStatus(res) != PGRES_TUPLES_OK))
! 		{
! 			/* finishStoreInfo saves the fields referred to below. */
! 			if (storeinfo.nummismatch)
! 			{
! 				/* This is only for backward compatibility */
! 				ereport(ERROR,
! 						(errcode(ERRCODE_DATATYPE_MISMATCH),
! 						 errmsg("remote query result rowtype does not match "
! 								"the specified FROM clause rowtype")));
! 			}
! 			else if (storeinfo.edata)
! 				ReThrowError(storeinfo.edata);
! 
! 			dblink_res_error(conname, res, "could not execute query", fail);
! 			return (Datum) 0;
! 		}
  	}
+ 	PQclear(res);
  
  	return (Datum) 0;
  }
  
  static void
! initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
  {
  	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ 	TupleDesc	tupdesc;
+ 	int i;
  
! 	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! 	{
! 		case TYPEFUNC_COMPOSITE:
! 			/* success */
! 			break;
! 		case TYPEFUNC_RECORD:
! 			/* failed to determine actual type of RECORD */
! 			ereport(ERROR,
! 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 					 errmsg("function returning record called in context "
! 							"that cannot accept type record")));
! 			break;
! 		default:
! 			/* result type isn't composite */
! 			elog(ERROR, "return type must be a row type");
! 			break;
! 	}
  
! 	sinfo->oldcontext = MemoryContextSwitchTo(
! 		rsinfo->econtext->ecxt_per_query_memory);
! 
! 	/* make sure we have a persistent copy of the tupdesc */
! 	tupdesc = CreateTupleDescCopy(tupdesc);
! 
! 	sinfo->error_occurred = FALSE;
! 	sinfo->nummismatch = FALSE;
! 	sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 	sinfo->edata = NULL;
! 	sinfo->nattrs = tupdesc->natts;
! 	sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
! 	sinfo->valbuf = NULL;
! 	sinfo->valbuflen = NULL;
! 
! 	/* Preallocate memory of same size with c string array for values. */
! 	sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
! 	if (sinfo->valbuf)
! 		sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
! 	if (sinfo->valbuflen)
! 		sinfo->cstrs = (char **)malloc(sinfo->nattrs * sizeof(char*));
! 
! 	if (sinfo->cstrs == NULL)
  	{
! 		if (sinfo->valbuf)
! 			free(sinfo->valbuf);
! 		if (sinfo->valbuflen)
! 			free(sinfo->valbuflen);
  
! 		ereport(ERROR,
! 				(errcode(ERRCODE_OUT_OF_MEMORY),
! 				 errmsg("out of memory")));
! 	}
  
! 	for (i = 0 ; i < sinfo->nattrs ; i++)
! 	{
! 		sinfo->valbuf[i] = NULL;
! 		sinfo->valbuflen[i] = -1;
! 	}
  
! 	rsinfo->setResult = sinfo->tuplestore;
! 	rsinfo->setDesc = tupdesc;
! }
  
! static void
! finishStoreInfo(storeInfo *sinfo)
! {
! 	int i;
  
! 	if (sinfo->valbuf)
! 	{
! 		for (i = 0 ; i < sinfo->nattrs ; i++)
! 		{
! 			if (sinfo->valbuf[i])
! 				free(sinfo->valbuf[i]);
  		}
+ 		free(sinfo->valbuf);
+ 		sinfo->valbuf = NULL;
+ 	}
  
! 	if (sinfo->valbuflen)
! 	{
! 		free(sinfo->valbuflen);
! 		sinfo->valbuflen = NULL;
! 	}
  
! 	if (sinfo->cstrs)
! 	{
! 		free(sinfo->cstrs);
! 		sinfo->cstrs = NULL;
! 	}
  
! 	MemoryContextSwitchTo(sinfo->oldcontext);
! }
  
! static int
! storeHandler(PGresult *res, void *param, PGrowValue *columns)
! {
! 	storeInfo *sinfo = (storeInfo *)param;
! 	HeapTuple  tuple;
! 	int        fields = PQnfields(res);
! 	int        i;
! 	char      **cstrs = sinfo->cstrs;
  
! 	if (sinfo->error_occurred)
! 		return FALSE;
  
! 	if (sinfo->nattrs != fields)
! 	{
! 		sinfo->error_occurred = TRUE;
! 		sinfo->nummismatch = TRUE;
! 		finishStoreInfo(sinfo);
! 
! 		/* This error will be processed in
! 		 * dblink_record_internal(). So do not set error message
! 		 * here. */
! 		return FALSE;
! 	}
  
! 	/*
! 	 * value input functions assumes that the input string is
! 	 * terminated by zero. We should make the values to be so.
! 	 */
! 	for(i = 0 ; i < fields ; i++)
! 	{
! 		int len = columns[i].len;
! 		if (len < 0)
! 			cstrs[i] = NULL;
! 		else
! 		{
! 			char *tmp = sinfo->valbuf[i];
! 			int tmplen = sinfo->valbuflen[i];
! 
! 			/*
! 			 * Divide calls to malloc and realloc so that things will
! 			 * go fine even on the systems of which realloc() does not
! 			 * accept NULL as old memory block.
! 			 *
! 			 * Also try to (re)allocate in bigger steps to
! 			 * avoid flood of allocations on weird data.
! 			 */
! 			if (tmp == NULL)
! 			{
! 				tmplen = len + 1;
! 				if (tmplen < 64)
! 					tmplen = 64;
! 				tmp = (char *)malloc(tmplen);
! 			}
! 			else if (tmplen < len + 1)
! 			{
! 				if (len + 1 > tmplen * 2)
! 					tmplen = len + 1;
! 				else
! 					tmplen = tmplen * 2;
! 				tmp = (char *)realloc(tmp, tmplen);
  			}
  
! 			/*
! 			 * sinfo->valbuf[n] will be freed in finishStoreInfo()
! 			 * when realloc returns NULL.
! 			 */
! 			if (tmp == NULL)
! 				return FALSE;
  
! 			sinfo->valbuf[i] = tmp;
! 			sinfo->valbuflen[i] = tmplen;
! 
! 			cstrs[i] = sinfo->valbuf[i];
! 			memcpy(cstrs[i], columns[i].value, len);
! 			cstrs[i][len] = '\0';
! 		}
! 	}
! done:
! 	PG_TRY();
! 	{
! 		tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
! 		tuplestore_puttuple(sinfo->tuplestore, tuple);
  	}
  	PG_CATCH();
  	{
! 		MemoryContext context;
! 		/*
! 		 * Store exception for later ReThrow and cancel the exception.
! 		 */
! 		sinfo->error_occurred = TRUE;
! 		context = MemoryContextSwitchTo(sinfo->oldcontext);
! 		sinfo->edata = CopyErrorData();
! 		MemoryContextSwitchTo(context);
! 		FlushErrorState();
! 		return FALSE;
  	}
  	PG_END_TRY();
+ 
+ 	return TRUE;
  }
  
  /*
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to