On Thu, Feb 02, 2012 at 04:51:37PM +0900, Kyotaro HORIGUCHI wrote:
> Hello, This is new version of dblink.c
> 
> - Memory is properly freed when realloc returns NULL in storeHandler().
> 
> - The bug that free() in finishStoreInfo() will be fed with
>   garbage pointer when malloc for sinfo->valbuflen fails is
>   fixed.

Thanks, merged.  I also did some minor coding style cleanups.

Tagging it Ready For Committer.  I don't see any notable
issues with the patch anymore.

There is some potential for experimenting with more aggressive
optimizations on dblink side, but I'd like to get a nod from
a committer for libpq changes first.

-- 
marko

*** a/src/interfaces/libpq/exports.txt
--- b/src/interfaces/libpq/exports.txt
***************
*** 160,162 **** PQconnectStartParams      157
--- 160,164 ----
  PQping                    158
  PQpingParams              159
  PQlibVersion              160
+ PQsetRowProcessor	  161
+ PQsetRowProcessorErrMsg	  162
*** a/src/interfaces/libpq/fe-connect.c
--- b/src/interfaces/libpq/fe-connect.c
***************
*** 2693,2698 **** makeEmptyPGconn(void)
--- 2693,2701 ----
  	conn->wait_ssl_try = false;
  #endif
  
+ 	/* set default row processor */
+ 	PQsetRowProcessor(conn, NULL, NULL);
+ 
  	/*
  	 * We try to send at least 8K at a time, which is the usual size of pipe
  	 * buffers on Unix systems.  That way, when we are sending a large amount
***************
*** 2711,2718 **** makeEmptyPGconn(void)
--- 2714,2726 ----
  	initPQExpBuffer(&conn->errorMessage);
  	initPQExpBuffer(&conn->workBuffer);
  
+ 	/* set up initial row buffer */
+ 	conn->rowBufLen = 32;
+ 	conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue));
+ 
  	if (conn->inBuffer == NULL ||
  		conn->outBuffer == NULL ||
+ 		conn->rowBuf == NULL ||
  		PQExpBufferBroken(&conn->errorMessage) ||
  		PQExpBufferBroken(&conn->workBuffer))
  	{
***************
*** 2812,2817 **** freePGconn(PGconn *conn)
--- 2820,2827 ----
  		free(conn->inBuffer);
  	if (conn->outBuffer)
  		free(conn->outBuffer);
+ 	if (conn->rowBuf)
+ 		free(conn->rowBuf);
  	termPQExpBuffer(&conn->errorMessage);
  	termPQExpBuffer(&conn->workBuffer);
  
***************
*** 5076,5078 **** PQregisterThreadLock(pgthreadlock_t newhandler)
--- 5086,5089 ----
  
  	return prev;
  }
+ 
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 66,71 **** static PGresult *PQexecFinish(PGconn *conn);
--- 66,72 ----
  static int PQsendDescribe(PGconn *conn, char desc_type,
  			   const char *desc_target);
  static int	check_field_number(const PGresult *res, int field_num);
+ static int	pqAddRow(PGresult *res, void *param, PGrowValue *columns);
  
  
  /* ----------------
***************
*** 160,165 **** PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
--- 161,167 ----
  	result->curBlock = NULL;
  	result->curOffset = 0;
  	result->spaceLeft = 0;
+ 	result->rowProcessorErrMsg = NULL;
  
  	if (conn)
  	{
***************
*** 701,707 **** pqClearAsyncResult(PGconn *conn)
  	if (conn->result)
  		PQclear(conn->result);
  	conn->result = NULL;
- 	conn->curTuple = NULL;
  }
  
  /*
--- 703,708 ----
***************
*** 756,762 **** pqPrepareAsyncResult(PGconn *conn)
  	 */
  	res = conn->result;
  	conn->result = NULL;		/* handing over ownership to caller */
- 	conn->curTuple = NULL;		/* just in case */
  	if (!res)
  		res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
  	else
--- 757,762 ----
***************
*** 828,833 **** pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
--- 828,900 ----
  }
  
  /*
+  * PQsetRowProcessor
+  *   Set function that copies column data out from network buffer.
+  */
+ void
+ PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+ {
+ 	conn->rowProcessor = (func ? func : pqAddRow);
+ 	conn->rowProcessorParam = param;
+ }
+ 
+ /*
+  * PQsetRowProcessorErrMsg
+  *    Set the error message pass back to the caller of RowProcessor.
+  *
+  *    You can replace the previous message by alternative mes, or clear
+  *    it with NULL.
+  */
+ void
+ PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+ {
+ 	if (msg)
+ 		res->rowProcessorErrMsg = pqResultStrdup(res, msg);
+ 	else
+ 		res->rowProcessorErrMsg = NULL;
+ }
+ 
+ /*
+  * pqAddRow
+  *	  add a row to the PGresult structure, growing it if necessary
+  *	  Returns TRUE if OK, FALSE if not enough memory to add the row.
+  */
+ static int
+ pqAddRow(PGresult *res, void *param, PGrowValue *columns)
+ {
+ 	PGresAttValue *tup;
+ 	int			nfields = res->numAttributes;
+ 	int			i;
+ 
+ 	tup = (PGresAttValue *)
+ 		pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+ 	if (tup == NULL)
+ 		return FALSE;
+ 
+ 	for (i = 0 ; i < nfields ; i++)
+ 	{
+ 		tup[i].len = columns[i].len;
+ 		if (tup[i].len == NULL_LEN)
+ 		{
+ 			tup[i].value = res->null_field;
+ 		}
+ 		else
+ 		{
+ 			bool isbinary = (res->attDescs[i].format != 0);
+ 			tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+ 			if (tup[i].value == NULL)
+ 				return FALSE;
+ 
+ 			memcpy(tup[i].value, columns[i].value, tup[i].len);
+ 			/* We have to terminate this ourselves */
+ 			tup[i].value[tup[i].len] = '\0';
+ 		}
+ 	}
+ 
+ 	return pqAddTuple(res, tup);
+ }
+ 
+ /*
   * pqAddTuple
   *	  add a row pointer to the PGresult structure, growing it if necessary
   *	  Returns TRUE if OK, FALSE if not enough memory to add the row
***************
*** 1223,1229 **** PQsendQueryStart(PGconn *conn)
  
  	/* initialize async result-accumulation state */
  	conn->result = NULL;
- 	conn->curTuple = NULL;
  
  	/* ready to send command message */
  	return true;
--- 1290,1295 ----
*** a/src/interfaces/libpq/fe-misc.c
--- b/src/interfaces/libpq/fe-misc.c
***************
*** 219,224 **** pqGetnchar(char *s, size_t len, PGconn *conn)
--- 219,243 ----
  }
  
  /*
+  * pqGetnchar:
+  *	skip len bytes in input buffer.
+  */
+ int
+ pqSkipnchar(size_t len, PGconn *conn)
+ {
+ 	if (len > (size_t) (conn->inEnd - conn->inCursor))
+ 		return EOF;
+ 
+ 	conn->inCursor += len;
+ 
+ 	if (conn->Pfdebug)
+ 		fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+ 				(unsigned long) len);
+ 
+ 	return 0;
+ }
+ 
+ /*
   * pqPutnchar:
   *	write exactly len bytes to the current message
   */
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
***************
*** 703,721 **** failure:
  
  /*
   * parseInput subroutine to read a 'B' or 'D' (row data) message.
!  * We add another tuple to the existing PGresult structure.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  We keep a partially constructed
!  * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
   */
  static int
  getAnotherTuple(PGconn *conn, bool binary)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGresAttValue *tup;
  
  	/* the backend sends us a bitmap of which attributes are null */
  	char		std_bitmap[64]; /* used unless it doesn't fit */
--- 703,720 ----
  
  /*
   * parseInput subroutine to read a 'B' or 'D' (row data) message.
!  * It fills rowbuf with column pointers and then calls row processor.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.
   */
  static int
  getAnotherTuple(PGconn *conn, bool binary)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGrowValue  *rowbuf;
  
  	/* the backend sends us a bitmap of which attributes are null */
  	char		std_bitmap[64]; /* used unless it doesn't fit */
***************
*** 727,754 **** getAnotherTuple(PGconn *conn, bool binary)
  	int			bitcnt;			/* number of bits examined in current byte */
  	int			vlen;			/* length of the current field value */
  
  	result->binary = binary;
  
! 	/* Allocate tuple space if first time for this data message */
! 	if (conn->curTuple == NULL)
  	{
! 		conn->curTuple = (PGresAttValue *)
! 			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
! 		if (conn->curTuple == NULL)
! 			goto outOfMemory;
! 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
! 
! 		/*
! 		 * If it's binary, fix the column format indicators.  We assume the
! 		 * backend will consistently send either B or D, not a mix.
! 		 */
! 		if (binary)
! 		{
! 			for (i = 0; i < nfields; i++)
! 				result->attDescs[i].format = 1;
! 		}
  	}
- 	tup = conn->curTuple;
  
  	/* Get the null-value bitmap */
  	nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
--- 726,752 ----
  	int			bitcnt;			/* number of bits examined in current byte */
  	int			vlen;			/* length of the current field value */
  
+ 	/* resize row buffer if needed */
+ 	if (nfields > conn->rowBufLen)
+ 	{
+ 		rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ 		if (!rowbuf)
+ 			goto rowProcessError;
+ 		conn->rowBuf = rowbuf;
+ 		conn->rowBufLen = nfields;
+ 	}
+ 	else
+ 	{
+ 		rowbuf = conn->rowBuf;
+ 	}
+ 
  	result->binary = binary;
  
! 	if (binary)
  	{
! 		for (i = 0; i < nfields; i++)
! 			result->attDescs[i].format = 1;
  	}
  
  	/* Get the null-value bitmap */
  	nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE;
***************
*** 757,763 **** getAnotherTuple(PGconn *conn, bool binary)
  	{
  		bitmap = (char *) malloc(nbytes);
  		if (!bitmap)
! 			goto outOfMemory;
  	}
  
  	if (pqGetnchar(bitmap, nbytes, conn))
--- 755,761 ----
  	{
  		bitmap = (char *) malloc(nbytes);
  		if (!bitmap)
! 			goto rowProcessError;
  	}
  
  	if (pqGetnchar(bitmap, nbytes, conn))
***************
*** 771,804 **** getAnotherTuple(PGconn *conn, bool binary)
  	for (i = 0; i < nfields; i++)
  	{
  		if (!(bmap & 0200))
! 		{
! 			/* if the field value is absent, make it a null string */
! 			tup[i].value = result->null_field;
! 			tup[i].len = NULL_LEN;
! 		}
  		else
  		{
- 			/* get the value length (the first four bytes are for length) */
- 			if (pqGetInt(&vlen, 4, conn))
- 				goto EOFexit;
  			if (!binary)
  				vlen = vlen - 4;
  			if (vlen < 0)
  				vlen = 0;
- 			if (tup[i].value == NULL)
- 			{
- 				tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
- 				if (tup[i].value == NULL)
- 					goto outOfMemory;
- 			}
- 			tup[i].len = vlen;
- 			/* read in the value */
- 			if (vlen > 0)
- 				if (pqGetnchar((char *) (tup[i].value), vlen, conn))
- 					goto EOFexit;
- 			/* we have to terminate this ourselves */
- 			tup[i].value[vlen] = '\0';
  		}
  		/* advance the bitmap stuff */
  		bitcnt++;
  		if (bitcnt == BITS_PER_BYTE)
--- 769,797 ----
  	for (i = 0; i < nfields; i++)
  	{
  		if (!(bmap & 0200))
! 			vlen = NULL_LEN;
! 		else if (pqGetInt(&vlen, 4, conn))
! 				goto EOFexit;
  		else
  		{
  			if (!binary)
  				vlen = vlen - 4;
  			if (vlen < 0)
  				vlen = 0;
  		}
+ 
+ 		/*
+ 		 * rowbuf[i].value always points to the next address of the
+ 		 * length field even if the value is NULL, to allow safe
+ 		 * size estimates and data copy.
+ 		 */
+ 		rowbuf[i].value = conn->inBuffer + conn->inCursor;
+ 		rowbuf[i].len = vlen;
+ 
+ 		/* Skip the value */
+ 		if (vlen > 0 && pqSkipnchar(vlen, conn))
+ 			goto EOFexit;
+ 
  		/* advance the bitmap stuff */
  		bitcnt++;
  		if (bitcnt == BITS_PER_BYTE)
***************
*** 811,827 **** getAnotherTuple(PGconn *conn, bool binary)
  			bmap <<= 1;
  	}
  
! 	/* Success!  Store the completed tuple in the result */
! 	if (!pqAddTuple(result, tup))
! 		goto outOfMemory;
! 	/* and reset for a new message */
! 	conn->curTuple = NULL;
  
  	if (bitmap != std_bitmap)
  		free(bitmap);
  	return 0;
  
! outOfMemory:
  	/* Replace partially constructed result with an error result */
  
  	/*
--- 804,820 ----
  			bmap <<= 1;
  	}
  
! 	/* Success!  Pass the completed row values to rowProcessor */
! 	if (!conn->rowProcessor(result, conn->rowProcessorParam, rowbuf))
! 		goto rowProcessError;
  
  	if (bitmap != std_bitmap)
  		free(bitmap);
+ 
  	return 0;
  
! rowProcessError:
! 
  	/* Replace partially constructed result with an error result */
  
  	/*
***************
*** 829,838 **** outOfMemory:
  	 * there's not enough memory to concatenate messages...
  	 */
  	pqClearAsyncResult(conn);
! 	printfPQExpBuffer(&conn->errorMessage,
! 					  libpq_gettext("out of memory for query result\n"));
  
  	/*
  	 * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
  	 * do to recover...
  	 */
--- 822,838 ----
  	 * there's not enough memory to concatenate messages...
  	 */
  	pqClearAsyncResult(conn);
! 	resetPQExpBuffer(&conn->errorMessage);
  
  	/*
+ 	 * If error message is passed from RowProcessor, set it into
+ 	 * PGconn, assume out of memory if not.
+ 	 */
+ 	appendPQExpBufferStr(&conn->errorMessage,
+ 						 result->rowProcessorErrMsg ?
+ 						 result->rowProcessorErrMsg :
+ 						 libpq_gettext("out of memory for query result\n"));
+ 	/*
  	 * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
  	 * do to recover...
  	 */
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
***************
*** 613,646 **** failure:
  
  /*
   * parseInput subroutine to read a 'D' (row data) message.
!  * We add another tuple to the existing PGresult structure.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.  We keep a partially constructed
!  * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
   */
  static int
  getAnotherTuple(PGconn *conn, int msgLength)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGresAttValue *tup;
  	int			tupnfields;		/* # fields from tuple */
  	int			vlen;			/* length of the current field value */
  	int			i;
  
- 	/* Allocate tuple space if first time for this data message */
- 	if (conn->curTuple == NULL)
- 	{
- 		conn->curTuple = (PGresAttValue *)
- 			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
- 		if (conn->curTuple == NULL)
- 			goto outOfMemory;
- 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
- 	}
- 	tup = conn->curTuple;
- 
  	/* Get the field count and make sure it's what we expect */
  	if (pqGetInt(&tupnfields, 2, conn))
  		return EOF;
--- 613,634 ----
  
  /*
   * parseInput subroutine to read a 'D' (row data) message.
!  * It fills rowbuf with column pointers and then calls row processor.
   * Returns: 0 if completed message, EOF if error or not enough data yet.
   *
   * Note that if we run out of data, we have to suspend and reprocess
!  * the message after more data is received.
   */
  static int
  getAnotherTuple(PGconn *conn, int msgLength)
  {
  	PGresult   *result = conn->result;
  	int			nfields = result->numAttributes;
! 	PGrowValue  *rowbuf;
  	int			tupnfields;		/* # fields from tuple */
  	int			vlen;			/* length of the current field value */
  	int			i;
  
  	/* Get the field count and make sure it's what we expect */
  	if (pqGetInt(&tupnfields, 2, conn))
  		return EOF;
***************
*** 656,661 **** getAnotherTuple(PGconn *conn, int msgLength)
--- 644,663 ----
  		return 0;
  	}
  
+ 	/* resize row buffer if needed */
+ 	if (nfields > conn->rowBufLen)
+ 	{
+ 		rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+ 		if (!rowbuf)
+ 			goto rowProcessError;
+ 		conn->rowBuf = rowbuf;
+ 		conn->rowBufLen = nfields;
+ 	}
+ 	else
+ 	{
+ 		rowbuf = conn->rowBuf;
+ 	}
+ 
  	/* Scan the fields */
  	for (i = 0; i < nfields; i++)
  	{
***************
*** 663,710 **** getAnotherTuple(PGconn *conn, int msgLength)
  		if (pqGetInt(&vlen, 4, conn))
  			return EOF;
  		if (vlen == -1)
! 		{
! 			/* null field */
! 			tup[i].value = result->null_field;
! 			tup[i].len = NULL_LEN;
! 			continue;
! 		}
! 		if (vlen < 0)
  			vlen = 0;
- 		if (tup[i].value == NULL)
- 		{
- 			bool		isbinary = (result->attDescs[i].format != 0);
  
! 			tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
! 			if (tup[i].value == NULL)
! 				goto outOfMemory;
! 		}
! 		tup[i].len = vlen;
! 		/* read in the value */
! 		if (vlen > 0)
! 			if (pqGetnchar((char *) (tup[i].value), vlen, conn))
! 				return EOF;
! 		/* we have to terminate this ourselves */
! 		tup[i].value[vlen] = '\0';
  	}
  
! 	/* Success!  Store the completed tuple in the result */
! 	if (!pqAddTuple(result, tup))
! 		goto outOfMemory;
! 	/* and reset for a new message */
! 	conn->curTuple = NULL;
  
  	return 0;
  
! outOfMemory:
  
  	/*
  	 * Replace partially constructed result with an error result. First
  	 * discard the old result to try to win back some memory.
  	 */
  	pqClearAsyncResult(conn);
! 	printfPQExpBuffer(&conn->errorMessage,
! 					  libpq_gettext("out of memory for query result\n"));
  	pqSaveErrorResult(conn);
  
  	/* Discard the failed message by pretending we read it */
--- 665,710 ----
  		if (pqGetInt(&vlen, 4, conn))
  			return EOF;
  		if (vlen == -1)
! 			vlen = NULL_LEN;
! 		else if (vlen < 0)
  			vlen = 0;
  
! 		/*
! 		 * rowbuf[i].value always points to the next address of the
! 		 * length field even if the value is NULL, to allow safe
! 		 * size estimates and data copy.
! 		 */
! 		rowbuf[i].value = conn->inBuffer + conn->inCursor;
! 		rowbuf[i].len = vlen;
! 
! 		/* Skip to the next length field */
! 		if (vlen > 0 && pqSkipnchar(vlen, conn))
! 			return EOF;
  	}
  
! 	/* Success!  Pass the completed row values to rowProcessor */
! 	if (!conn->rowProcessor(result, conn->rowProcessorParam, rowbuf))
! 		goto rowProcessError;
  
  	return 0;
  
! rowProcessError:
  
  	/*
  	 * Replace partially constructed result with an error result. First
  	 * discard the old result to try to win back some memory.
  	 */
  	pqClearAsyncResult(conn);
! 	resetPQExpBuffer(&conn->errorMessage);
! 
! 	/*
! 	 * If error message is passed from addTupleFunc, set it into
! 	 * PGconn, assume out of memory if not.
! 	 */
! 	appendPQExpBufferStr(&conn->errorMessage,
! 						 result->rowProcessorErrMsg ?
! 						 result->rowProcessorErrMsg :
! 						 libpq_gettext("out of memory for query result\n"));
  	pqSaveErrorResult(conn);
  
  	/* Discard the failed message by pretending we read it */
*** a/src/interfaces/libpq/libpq-fe.h
--- b/src/interfaces/libpq/libpq-fe.h
***************
*** 149,154 **** typedef struct pgNotify
--- 149,165 ----
  	struct pgNotify *next;		/* list link */
  } PGnotify;
  
+ /* PGrowValue points a column value of in network buffer.
+  * Value is a string without null termination and length len.
+  * NULL is represented as len < 0, value points then to place
+  * where value would have been.
+  */
+ typedef struct pgRowValue
+ {
+ 	int			len;			/* length in bytes of the value */
+ 	char	   *value;			/* actual value, without null termination */
+ } PGrowValue;
+ 
  /* Function types for notice-handling callbacks */
  typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
  typedef void (*PQnoticeProcessor) (void *arg, const char *message);
***************
*** 416,421 **** extern PGPing PQping(const char *conninfo);
--- 427,463 ----
  extern PGPing PQpingParams(const char *const * keywords,
  			 const char *const * values, int expand_dbname);
  
+ /*
+  * Typedef for alternative row processor.
+  *
+  * Columns array will contain PQnfields() entries, each one
+  * pointing to particular column data in network buffer.
+  * This function is supposed to copy data out from there
+  * and store somewhere.  NULL is signified with len<0.
+  *
+  * This function must return 1 for success and must return 0 for
+  * failure and may set error message by PQsetRowProcessorErrMsg.  It
+  * is assumed by caller as out of memory when the error message is not
+  * set on failure. This function is assumed not to throw any
+  * exception.
+  */
+ typedef int (*PQrowProcessor)(PGresult *res, void *param,
+ 								PGrowValue *columns);
+ 
+ /*
+  * Set alternative row data processor for PGconn.
+  *
+  * By registering this function, pg_result disables its own result
+  * store and calls it for rows one by one.
+  *
+  * func is row processor function. See the typedef RowProcessor.
+  *
+  * rowProcessorParam is the contextual variable that passed to
+  * RowProcessor.
+  */
+ extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func,
+ 								   void *rowProcessorParam);
+ 
  /* Force the write buffer to be written (or at least try) */
  extern int	PQflush(PGconn *conn);
  
***************
*** 454,459 **** extern char *PQcmdTuples(PGresult *res);
--- 496,502 ----
  extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
  extern int	PQgetlength(const PGresult *res, int tup_num, int field_num);
  extern int	PQgetisnull(const PGresult *res, int tup_num, int field_num);
+ extern void	PQsetRowProcessorErrMsg(PGresult *res, char *msg);
  extern int	PQnparams(const PGresult *res);
  extern Oid	PQparamtype(const PGresult *res, int param_num);
  
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
***************
*** 209,214 **** struct pg_result
--- 209,217 ----
  	PGresult_data *curBlock;	/* most recently allocated block */
  	int			curOffset;		/* start offset of free space in block */
  	int			spaceLeft;		/* number of free bytes remaining in block */
+ 
+ 	/* temp etorage for message from row processor callback */
+ 	char	   *rowProcessorErrMsg;
  };
  
  /* PGAsyncStatusType defines the state of the query-execution state machine */
***************
*** 398,404 **** struct pg_conn
  
  	/* Status for asynchronous result construction */
  	PGresult   *result;			/* result being constructed */
- 	PGresAttValue *curTuple;	/* tuple currently being read */
  
  #ifdef USE_SSL
  	bool		allow_ssl_try;	/* Allowed to try SSL negotiation */
--- 401,406 ----
***************
*** 443,448 **** struct pg_conn
--- 445,458 ----
  
  	/* Buffer for receiving various parts of messages */
  	PQExpBufferData workBuffer; /* expansible string */
+ 
+ 	/*
+ 	 * Read column data from network buffer.
+ 	 */
+ 	PQrowProcessor rowProcessor;/* Function pointer */
+ 	void *rowProcessorParam;	/* Contextual parameter for rowProcessor */
+ 	PGrowValue *rowBuf;			/* Buffer for passing values to rowProcessor */
+ 	int rowBufLen;				/* Number of columns allocated in rowBuf */
  };
  
  /* PGcancel stores all data necessary to cancel a connection. A copy of this
***************
*** 560,565 **** extern int	pqGets(PQExpBuffer buf, PGconn *conn);
--- 570,576 ----
  extern int	pqGets_append(PQExpBuffer buf, PGconn *conn);
  extern int	pqPuts(const char *s, PGconn *conn);
  extern int	pqGetnchar(char *s, size_t len, PGconn *conn);
+ extern int	pqSkipnchar(size_t len, PGconn *conn);
  extern int	pqPutnchar(const char *s, size_t len, PGconn *conn);
  extern int	pqGetInt(int *result, size_t bytes, PGconn *conn);
  extern int	pqPutInt(int value, size_t bytes, PGconn *conn);
*** a/doc/src/sgml/libpq.sgml
--- b/doc/src/sgml/libpq.sgml
***************
*** 7233,7238 **** int PQisthreadsafe();
--- 7233,7443 ----
   </sect1>
  
  
+  <sect1 id="libpq-altrowprocessor">
+   <title>Alternative row processor</title>
+ 
+   <indexterm zone="libpq-altrowprocessor">
+    <primary>PGresult</primary>
+    <secondary>PGconn</secondary>
+   </indexterm>
+ 
+   <para>
+    As the standard usage, rows are stored into <type>PQresult</type>
+    until full resultset is received.  Then such completely-filled
+    <type>PQresult</type> is passed to user.  This behaviour can be
+    changed by registering alternative row processor function,
+    that will see each row data as soon as it is received
+    from network.  It has the option of processing the data
+    immediately, or storing it into custom container.
+   </para>
+ 
+   <para>
+    Note - as row processor sees rows as they arrive, it cannot know
+    whether the SQL statement actually finishes successfully on server
+    or not.  So some care must be taken to get proper
+    transactionality.
+   </para>
+ 
+   <variablelist>
+    <varlistentry id="libpq-pqsetrowprocessor">
+     <term>
+      <function>PQsetRowProcessor</function>
+      <indexterm>
+       <primary>PQsetRowProcessor</primary>
+      </indexterm>
+     </term>
+ 
+     <listitem>
+      <para>
+        Sets a callback function to process each row.
+ <synopsis>
+ void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+ </synopsis>
+      </para>
+      
+      <para>
+        <variablelist>
+ 	 <varlistentry>
+ 	   <term><parameter>conn</parameter></term>
+ 	   <listitem>
+ 	     <para>
+ 	       The connection object to set the row processor function.
+ 	     </para>
+ 	   </listitem>
+ 	 </varlistentry>
+ 	 <varlistentry>
+ 	   <term><parameter>func</parameter></term>
+ 	   <listitem>
+ 	     <para>
+ 	       Storage handler function to set. NULL means to use the
+ 	       default processor.
+ 	     </para>
+ 	   </listitem>
+ 	 </varlistentry>
+ 	 <varlistentry>
+ 	   <term><parameter>param</parameter></term>
+ 	   <listitem>
+ 	     <para>
+ 	       A pointer to contextual parameter passed
+ 	       to <parameter>func</parameter>.
+ 	     </para>
+ 	   </listitem>
+ 	 </varlistentry>
+        </variablelist>
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+ 
+   <variablelist>
+    <varlistentry id="libpq-pqrowprocessor">
+     <term>
+      <type>PQrowProcessor</type>
+      <indexterm>
+       <primary>PQrowProcessor</primary>
+      </indexterm>
+     </term>
+ 
+     <listitem>
+      <para>
+        The type for the row processor callback function.
+ <synopsis>
+ int (*PQrowProcessor)(PGresult *res, void *param, PGrowValue *columns);
+ 
+ typedef struct
+ {
+     int         len;            /* length in bytes of the value */
+     char       *value;          /* actual value, without null termination */
+ } PGrowValue;
+ </synopsis>
+      </para>
+ 
+      <para>
+       The <parameter>columns</parameter> array will have PQnfields()
+       elements, each one pointing to column value in network buffer.
+      </para>
+ 
+      <para>
+        This function must process or copy row values away from network
+        buffer before it returns, as next row might overwrite them.
+      </para>
+ 
+      <para>
+        This function must return 1 for success, and 0 for failure.
+        On failure this function should set the error message
+        with <function>PGsetRowProcessorErrMsg</function> if the cause
+        is other than out of memory.  This funcion must not throw any
+        exception.
+      </para>
+      <variablelist>
+        <varlistentry>
+ 
+ 	 <term><parameter>res</parameter></term>
+ 	 <listitem>
+ 	   <para>
+ 	     A pointer to the <type>PGresult</type> object.
+ 	   </para>
+ 	 </listitem>
+        </varlistentry>
+        <varlistentry>
+ 
+ 	 <term><parameter>param</parameter></term>
+ 	 <listitem>
+ 	   <para>
+ 	     Extra parameter that was given to <function>PQsetRowProcessor</function>.
+ 	   </para>
+ 	 </listitem>
+        </varlistentry>
+        <varlistentry>
+ 
+ 	 <term><parameter>columns</parameter></term>
+ 	 <listitem>
+ 	   <para>
+ 	     Column values of the row to process.  Column values
+ 	     are located in network buffer, the processor must
+ 	     copy them out from there.
+ 	   </para>
+ 	   <para>
+ 	     Column values are not null-terminated, so processor cannot
+ 	     use C string functions on them directly.
+ 	   </para>
+ 	 </listitem>
+        </varlistentry>
+      </variablelist>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+ 
+   <variablelist>
+    <varlistentry id="libpq-pqsetrowprocessorerrmsg">
+     <term>
+      <function>PQsetRowProcessorErrMsg</function>
+      <indexterm>
+       <primary>PQsetRowProcessorErrMsg</primary>
+      </indexterm>
+     </term>
+     <listitem>
+       <para>
+ 	Set the message for the error occurred
+ 	in <type>PQrowProcessor</type>.  If this message is not set, the
+ 	caller assumes the error to be out of memory.
+ <synopsis>
+ void PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+ </synopsis>
+       </para>
+       <para>
+ 	<variablelist>
+ 	  <varlistentry>
+ 	    <term><parameter>res</parameter></term>
+ 	    <listitem>
+ 	      <para>
+ 		A pointer to the <type>PGresult</type> object
+ 		passed to <type>PQrowProcessor</type>.
+ 	      </para>
+ 	    </listitem>
+ 	  </varlistentry>
+ 	  <varlistentry>
+ 	    <term><parameter>mes</parameter></term>
+ 	    <listitem>
+ 	      <para>
+ 		Error message. This will be copied internally so there is
+ 		no need to care of the scope.
+ 	      </para>
+ 	      <para>
+ 		If <parameter>res</parameter> already has a message previously
+ 		set, it will be overritten. Set NULL to cancel the the costom
+ 		message.
+ 	      </para>
+ 	    </listitem>
+ 	  </varlistentry>
+ 	</variablelist>
+       </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect1>
+ 
+ 
   <sect1 id="libpq-build">
    <title>Building <application>libpq</application> Programs</title>
  
*** a/contrib/dblink/dblink.c
--- b/contrib/dblink/dblink.c
***************
*** 63,73 **** typedef struct remoteConn
  	bool		newXactForCursor;		/* Opened a transaction for a cursor */
  } remoteConn;
  
  /*
   * Internal declarations
   */
  static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
- static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn *rconn);
--- 63,85 ----
  	bool		newXactForCursor;		/* Opened a transaction for a cursor */
  } remoteConn;
  
+ typedef struct storeInfo
+ {
+ 	Tuplestorestate *tuplestore;
+ 	int nattrs;
+ 	MemoryContext oldcontext;
+ 	AttInMetadata *attinmeta;
+ 	char** valbuf;
+ 	int *valbuflen;
+ 	bool error_occurred;
+ 	bool nummismatch;
+ 	ErrorData *edata;
+ } storeInfo;
+ 
  /*
   * Internal declarations
   */
  static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn *rconn);
***************
*** 90,95 **** static char *escape_param_str(const char *from);
--- 102,111 ----
  static void validate_pkattnums(Relation rel,
  				   int2vector *pkattnums_arg, int32 pknumatts_arg,
  				   int **pkattnums, int *pknumatts);
+ static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+ static void finishStoreInfo(storeInfo *sinfo);
+ static int storeHandler(PGresult *res, void *param, PGrowValue *columns);
+ 
  
  /* Global */
  static remoteConn *pconn = NULL;
***************
*** 503,508 **** dblink_fetch(PG_FUNCTION_ARGS)
--- 519,525 ----
  	char	   *curname = NULL;
  	int			howmany = 0;
  	bool		fail = true;	/* default to backward compatible */
+ 	storeInfo   storeinfo;
  
  	DBLINK_INIT;
  
***************
*** 559,573 **** dblink_fetch(PG_FUNCTION_ARGS)
--- 576,611 ----
  	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
  
  	/*
+ 	 * Result is stored into storeinfo.tuplestore instead of
+ 	 * res->result retuned by PQexec below
+ 	 */
+ 	initStoreInfo(&storeinfo, fcinfo);
+ 	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+ 
+ 	/*
  	 * Try to execute the query.  Note that since libpq uses malloc, the
  	 * PGresult will be long-lived even though we are still in a short-lived
  	 * memory context.
  	 */
  	res = PQexec(conn, buf.data);
+ 	finishStoreInfo(&storeinfo);
+ 
  	if (!res ||
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
  	{
+ 		/* finishStoreInfo saves the fields referred to below. */
+ 		if (storeinfo.nummismatch)
+ 		{
+ 			/* This is only for backward compatibility */
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_DATATYPE_MISMATCH),
+ 					 errmsg("remote query result rowtype does not match "
+ 							"the specified FROM clause rowtype")));
+ 		}
+ 		else if (storeinfo.edata)
+ 			ReThrowError(storeinfo.edata);
+ 
  		dblink_res_error(conname, res, "could not fetch from cursor", fail);
  		return (Datum) 0;
  	}
***************
*** 579,586 **** dblink_fetch(PG_FUNCTION_ARGS)
  				(errcode(ERRCODE_INVALID_CURSOR_NAME),
  				 errmsg("cursor \"%s\" does not exist", curname)));
  	}
  
- 	materializeResult(fcinfo, res);
  	return (Datum) 0;
  }
  
--- 617,624 ----
  				(errcode(ERRCODE_INVALID_CURSOR_NAME),
  				 errmsg("cursor \"%s\" does not exist", curname)));
  	}
+ 	PQclear(res);
  
  	return (Datum) 0;
  }
  
***************
*** 640,645 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
--- 678,684 ----
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible */
  	bool		freeconn = false;
+ 	storeInfo   storeinfo;
  
  	/* check to see if caller supports us returning a tuplestore */
  	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
***************
*** 715,878 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
  	rsinfo->setResult = NULL;
  	rsinfo->setDesc = NULL;
  
  	/* synchronous query, or async result retrieval */
  	if (!is_async)
  		res = PQexec(conn, sql);
  	else
- 	{
  		res = PQgetResult(conn);
- 		/* NULL means we're all done with the async results */
- 		if (!res)
- 			return (Datum) 0;
- 	}
  
! 	/* if needed, close the connection to the database and cleanup */
! 	if (freeconn)
! 		PQfinish(conn);
  
! 	if (!res ||
! 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
! 		 PQresultStatus(res) != PGRES_TUPLES_OK))
  	{
! 		dblink_res_error(conname, res, "could not execute query", fail);
! 		return (Datum) 0;
  	}
  
- 	materializeResult(fcinfo, res);
  	return (Datum) 0;
  }
  
- /*
-  * Materialize the PGresult to return them as the function result.
-  * The res will be released in this function.
-  */
  static void
! materializeResult(FunctionCallInfo fcinfo, PGresult *res)
  {
  	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
  
! 	Assert(rsinfo->returnMode == SFRM_Materialize);
  
! 	PG_TRY();
  	{
! 		TupleDesc	tupdesc;
! 		bool		is_sql_cmd = false;
! 		int			ntuples;
! 		int			nfields;
  
! 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
! 		{
! 			is_sql_cmd = true;
  
! 			/*
! 			 * need a tuple descriptor representing one TEXT column to return
! 			 * the command status string as our result tuple
! 			 */
! 			tupdesc = CreateTemplateTupleDesc(1, false);
! 			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
! 							   TEXTOID, -1, 0);
! 			ntuples = 1;
! 			nfields = 1;
! 		}
! 		else
! 		{
! 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
  
! 			is_sql_cmd = false;
  
! 			/* get a tuple descriptor for our result type */
! 			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! 			{
! 				case TYPEFUNC_COMPOSITE:
! 					/* success */
! 					break;
! 				case TYPEFUNC_RECORD:
! 					/* failed to determine actual type of RECORD */
! 					ereport(ERROR,
! 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 						errmsg("function returning record called in context "
! 							   "that cannot accept type record")));
! 					break;
! 				default:
! 					/* result type isn't composite */
! 					elog(ERROR, "return type must be a row type");
! 					break;
! 			}
  
! 			/* make sure we have a persistent copy of the tupdesc */
! 			tupdesc = CreateTupleDescCopy(tupdesc);
! 			ntuples = PQntuples(res);
! 			nfields = PQnfields(res);
  		}
  
! 		/*
! 		 * check result and tuple descriptor have the same number of columns
! 		 */
! 		if (nfields != tupdesc->natts)
! 			ereport(ERROR,
! 					(errcode(ERRCODE_DATATYPE_MISMATCH),
! 					 errmsg("remote query result rowtype does not match "
! 							"the specified FROM clause rowtype")));
  
! 		if (ntuples > 0)
! 		{
! 			AttInMetadata *attinmeta;
! 			Tuplestorestate *tupstore;
! 			MemoryContext oldcontext;
! 			int			row;
! 			char	  **values;
! 
! 			attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 
! 			oldcontext = MemoryContextSwitchTo(
! 									rsinfo->econtext->ecxt_per_query_memory);
! 			tupstore = tuplestore_begin_heap(true, false, work_mem);
! 			rsinfo->setResult = tupstore;
! 			rsinfo->setDesc = tupdesc;
! 			MemoryContextSwitchTo(oldcontext);
  
! 			values = (char **) palloc(nfields * sizeof(char *));
  
! 			/* put all tuples into the tuplestore */
! 			for (row = 0; row < ntuples; row++)
! 			{
! 				HeapTuple	tuple;
  
! 				if (!is_sql_cmd)
! 				{
! 					int			i;
  
! 					for (i = 0; i < nfields; i++)
! 					{
! 						if (PQgetisnull(res, row, i))
! 							values[i] = NULL;
! 						else
! 							values[i] = PQgetvalue(res, row, i);
! 					}
! 				}
  				else
! 				{
! 					values[0] = PQcmdStatus(res);
! 				}
! 
! 				/* build the tuple and put it into the tuplestore. */
! 				tuple = BuildTupleFromCStrings(attinmeta, values);
! 				tuplestore_puttuple(tupstore, tuple);
  			}
  
! 			/* clean up and return the tuplestore */
! 			tuplestore_donestoring(tupstore);
  		}
  
! 		PQclear(res);
  	}
  	PG_CATCH();
  	{
! 		/* be sure to release the libpq result */
! 		PQclear(res);
! 		PG_RE_THROW();
  	}
  	PG_END_TRY();
  }
  
  /*
--- 754,993 ----
  	rsinfo->setResult = NULL;
  	rsinfo->setDesc = NULL;
  
+ 
+ 	/*
+ 	 * Result is stored into storeinfo.tuplestore instead of
+ 	 * res->result retuned by PQexec/PQgetResult below
+ 	 */
+ 	initStoreInfo(&storeinfo, fcinfo);
+ 	PQsetRowProcessor(conn, storeHandler, &storeinfo);
+ 
  	/* synchronous query, or async result retrieval */
  	if (!is_async)
  		res = PQexec(conn, sql);
  	else
  		res = PQgetResult(conn);
  
! 	finishStoreInfo(&storeinfo);
  
! 	/* NULL res from async get means we're all done with the results */
! 	if (res || !is_async)
  	{
! 		if (freeconn)
! 			PQfinish(conn);
! 
! 		if (!res ||
! 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
! 			 PQresultStatus(res) != PGRES_TUPLES_OK))
! 		{
! 			/* finishStoreInfo saves the fields referred to below. */
! 			if (storeinfo.nummismatch)
! 			{
! 				/* This is only for backward compatibility */
! 				ereport(ERROR,
! 						(errcode(ERRCODE_DATATYPE_MISMATCH),
! 						 errmsg("remote query result rowtype does not match "
! 								"the specified FROM clause rowtype")));
! 			}
! 			else if (storeinfo.edata)
! 				ReThrowError(storeinfo.edata);
! 
! 			dblink_res_error(conname, res, "could not execute query", fail);
! 			return (Datum) 0;
! 		}
  	}
+ 	PQclear(res);
  
  	return (Datum) 0;
  }
  
  static void
! initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
  {
  	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ 	TupleDesc	tupdesc;
+ 	int i;
  
! 	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
! 	{
! 		case TYPEFUNC_COMPOSITE:
! 			/* success */
! 			break;
! 		case TYPEFUNC_RECORD:
! 			/* failed to determine actual type of RECORD */
! 			ereport(ERROR,
! 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 					 errmsg("function returning record called in context "
! 							"that cannot accept type record")));
! 			break;
! 		default:
! 			/* result type isn't composite */
! 			elog(ERROR, "return type must be a row type");
! 			break;
! 	}
  
! 	sinfo->oldcontext = MemoryContextSwitchTo(
! 		rsinfo->econtext->ecxt_per_query_memory);
! 
! 	/* make sure we have a persistent copy of the tupdesc */
! 	tupdesc = CreateTupleDescCopy(tupdesc);
! 
! 	sinfo->error_occurred = FALSE;
! 	sinfo->nummismatch = FALSE;
! 	sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 	sinfo->edata = NULL;
! 	sinfo->nattrs = tupdesc->natts;
! 	sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
! 	sinfo->valbuf = NULL;
! 	sinfo->valbuflen = NULL;
! 
! 	/* Preallocate memory of same size with c string array for values. */
! 	sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
! 	if (sinfo->valbuf)
! 		sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
! 	if (sinfo->valbuflen == NULL)
  	{
! 		if (sinfo->valbuf)
! 			free(sinfo->valbuf);
  
! 		ereport(ERROR,
! 				(errcode(ERRCODE_OUT_OF_MEMORY),
! 				 errmsg("out of memory")));
! 	}
  
! 	for (i = 0 ; i < sinfo->nattrs ; i++)
! 	{
! 		sinfo->valbuf[i] = NULL;
! 		sinfo->valbuflen[i] = -1;
! 	}
  
! 	rsinfo->setResult = sinfo->tuplestore;
! 	rsinfo->setDesc = tupdesc;
! }
  
! static void
! finishStoreInfo(storeInfo *sinfo)
! {
! 	int i;
  
! 	if (sinfo->valbuf)
! 	{
! 		for (i = 0 ; i < sinfo->nattrs ; i++)
! 		{
! 			if (sinfo->valbuf[i])
! 				free(sinfo->valbuf[i]);
  		}
+ 		free(sinfo->valbuf);
+ 		sinfo->valbuf = NULL;
+ 	}
  
! 	if (sinfo->valbuflen)
! 	{
! 		free(sinfo->valbuflen);
! 		sinfo->valbuflen = NULL;
! 	}
! 	MemoryContextSwitchTo(sinfo->oldcontext);
! }
  
! static int
! storeHandler(PGresult *res, void *param, PGrowValue *columns)
! {
! 	storeInfo *sinfo = (storeInfo *)param;
! 	HeapTuple  tuple;
! 	int        fields = PQnfields(res);
! 	int        i;
! 	char      *cstrs[PQnfields(res)];
  
! 	if (sinfo->error_occurred)
! 		return FALSE;
  
! 	if (sinfo->nattrs != fields)
! 	{
! 		sinfo->error_occurred = TRUE;
! 		sinfo->nummismatch = TRUE;
! 		finishStoreInfo(sinfo);
! 
! 		/* This error will be processed in
! 		 * dblink_record_internal(). So do not set error message
! 		 * here. */
! 		return FALSE;
! 	}
  
! 	/*
! 	 * value input functions assumes that the input string is
! 	 * terminated by zero. We should make the values to be so.
! 	 */
! 	for(i = 0 ; i < fields ; i++)
! 	{
! 		int len = columns[i].len;
! 		if (len < 0)
! 			cstrs[i] = NULL;
! 		else
! 		{
! 			char *tmp = sinfo->valbuf[i];
! 			int tmplen = sinfo->valbuflen[i];
  
! 			/*
! 			 * Divide calls to malloc and realloc so that things will
! 			 * go fine even on the systems of which realloc() does not
! 			 * accept NULL as old memory block.
! 			 *
! 			 * Also try to (re)allocate in bigger steps to
! 			 * avoid flood of allocations on weird data.
! 			 */
! 			if (tmp == NULL)
! 			{
! 				tmplen = len + 1;
! 				if (tmplen < 64)
! 					tmplen = 64;
! 				tmp = (char *)malloc(tmplen);
! 			}
! 			else if (tmplen < len + 1)
! 			{
! 				if (len + 1 > tmplen * 2)
! 					tmplen = len + 1;
  				else
! 					tmplen = tmplen * 2;
! 				tmp = (char *)realloc(tmp, tmplen);
  			}
  
! 			/*
! 			 * sinfo->valbuf[n] will be freed in finishStoreInfo()
! 			 * when realloc returns NULL.
! 			 */
! 			if (tmp == NULL)
! 				return FALSE;
! 
! 			sinfo->valbuf[i] = tmp;
! 			sinfo->valbuflen[i] = tmplen;
! 
! 			cstrs[i] = sinfo->valbuf[i];
! 			memcpy(cstrs[i], columns[i].value, len);
! 			cstrs[i][len] = '\0';
  		}
+ 	}
  
! 	PG_TRY();
! 	{
! 		tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
! 		tuplestore_puttuple(sinfo->tuplestore, tuple);
  	}
  	PG_CATCH();
  	{
! 		MemoryContext context;
! 		/*
! 		 * Store exception for later ReThrow and cancel the exception.
! 		 */
! 		sinfo->error_occurred = TRUE;
! 		context = MemoryContextSwitchTo(sinfo->oldcontext);
! 		sinfo->edata = CopyErrorData();
! 		MemoryContextSwitchTo(context);
! 		FlushErrorState();
! 		return FALSE;
  	}
  	PG_END_TRY();
+ 
+ 	return TRUE;
  }
  
  /*
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to