Hello,  This is revised and rebased version of the patch.

a. Old term `Add Tuple Function' is changed to 'Store
   Handler'. The reason why not `storage' is simply length of the
   symbols.

b. I couldn't find the place to settle PGgetAsCString() in. It is
   removed and storeHandler()@dblink.c touches PGresAttValue
   directly in this new patch. Definition of PGresAttValue stays
   in lipq-fe.h and provided with comment.

c. Refine error handling of dblink.c. I think it preserves the
   previous behavior for column number mismatch and type
   conversion exception.

d. Document is revised.

> It jumped from 332K tuples/sec to 450K, a 35% gain, and had a
> lower memory footprint too.  Test methodology and those results
> are at
> http://archives.postgresql.org/pgsql-hackers/2011-12/msg00008.php

It is a disappointment that I found that the gain had become
lower than that according to the re-measuring.

For CentOS6.2 and other conditions are the same to the previous
testing, the overall performance became hihger and the loss of
libpq patch was 1.8% and the gain of full patch had been fallen
to 5.6%. But the reduction of the memory usage was not changed.

Original             : 3.96s  100.0%
w/libpq patch        : 4.03s  101.8%
w/libpq+dblink patch : 3.74s   94.4%


The attachments are listed below.

libpq_altstore_20120117.patch
  - Allow alternative storage for libpql.

dblink_perf_20120117.patch
  - Modify dblink to use alternative storage mechanism.
 
libpq_altstore_doc_20120117.patch
  - Document for libpq_altstore. Shows in "31.19. Alternatie result storage"


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..83525e1 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,6 @@ PQconnectStartParams      157
 PQping                    158
 PQpingParams              159
 PQlibVersion              160
+PQregisterStoreHandler	  161
+PQgetStoreHandlerParam	  163
+PQsetStoreHandlerErrMes	  164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index d454538..5559f0b 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2692,6 +2692,7 @@ makeEmptyPGconn(void)
 	conn->allow_ssl_try = true;
 	conn->wait_ssl_try = false;
 #endif
+	conn->storeHandler = NULL;
 
 	/*
 	 * We try to send at least 8K at a time, which is the usual size of pipe
@@ -5076,3 +5077,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler)
 
 	return prev;
 }
+
+void
+PQregisterStoreHandler(PGconn *conn, StoreHandler func, void *param)
+{
+	conn->storeHandler = func;
+	conn->storeHandlerParam = param;
+}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..96e5974 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -67,6 +67,10 @@ static int PQsendDescribe(PGconn *conn, char desc_type,
 			   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
 
+static void *pqDefaultStoreHandler(PGresult *res, PQStoreFunc func,
+								   int id, size_t len);
+static void *pqAddTuple(PGresult *res, PGresAttValue *tup);
+
 
 /* ----------------
  * Space management for PGresult.
@@ -160,6 +164,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
 	result->curBlock = NULL;
 	result->curOffset = 0;
 	result->spaceLeft = 0;
+	result->storeHandler = pqDefaultStoreHandler;
+	result->storeHandlerParam = NULL;
+	result->storeHandlerErrMes = NULL;
 
 	if (conn)
 	{
@@ -194,6 +201,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
 			}
 			result->nEvents = conn->nEvents;
 		}
+
+		if (conn->storeHandler)
+		{
+			result->storeHandler = conn->storeHandler;
+			result->storeHandlerParam = conn->storeHandlerParam;
+		}
 	}
 	else
 	{
@@ -487,6 +500,33 @@ PQresultAlloc(PGresult *res, size_t nBytes)
 	return pqResultAlloc(res, nBytes, TRUE);
 }
 
+void *
+pqDefaultStoreHandler(PGresult *res, PQStoreFunc func, int id, size_t len)
+{
+	void *p;
+
+	switch (func)
+	{
+		case PQSF_ALLOC_TEXT:
+			return pqResultAlloc(res, len, TRUE);
+
+		case PQSF_ALLOC_BINARY:
+			p = pqResultAlloc(res, len, FALSE);
+
+			if (id == -1)
+				res->storeHandlerParam = p;
+
+			return p;
+
+		case PQSF_ADD_TUPLE:
+			return pqAddTuple(res, res->storeHandlerParam);
+
+		default:
+			/* Ignore */
+			break;
+	}
+	return NULL;
+}
 /*
  * pqResultAlloc -
  *		Allocate subsidiary storage for a PGresult.
@@ -830,9 +870,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
 /*
  * pqAddTuple
  *	  add a row pointer to the PGresult structure, growing it if necessary
- *	  Returns TRUE if OK, FALSE if not enough memory to add the row
+ *	  Returns tup if OK, NULL if not enough memory to add the row.
  */
-int
+static void *
 pqAddTuple(PGresult *res, PGresAttValue *tup)
 {
 	if (res->ntups >= res->tupArrSize)
@@ -858,13 +898,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup)
 			newTuples = (PGresAttValue **)
 				realloc(res->tuples, newSize * sizeof(PGresAttValue *));
 		if (!newTuples)
-			return FALSE;		/* malloc or realloc failed */
+			return NULL;		/* malloc or realloc failed */
 		res->tupArrSize = newSize;
 		res->tuples = newTuples;
 	}
 	res->tuples[res->ntups] = tup;
 	res->ntups++;
-	return TRUE;
+	return tup;
 }
 
 /*
@@ -2822,6 +2862,35 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num)
 		return 0;
 }
 
+/* PQgetAddStoreHandlerParam
+ *	Get the pointer to the contextual parameter from PGresult which is
+ *	registered to PGconn by PQregisterStoreHandler
+ */
+void *
+PQgetStoreHandlerParam(const PGresult *res)
+{
+	if (!res)
+		return NULL;
+	return res->storeHandlerParam;
+}
+
+/* PQsetStorHandlerErrMes
+ *	Set the error message pass back to the caller of StoreHandler.
+ *
+ *  mes must be a malloc'ed memory block and it will be released by
+ *  the caller of StoreHandler.  You can replace the previous message
+ *  by alternative mes, or clear it with NULL. The previous one will
+ *  be freed internally.
+ */
+void
+PQsetStoreHandlerErrMes(PGresult *res, char *mes)
+{
+	/* Free existing message */
+	if (res->storeHandlerErrMes)
+		free(res->storeHandlerErrMes);
+	res->storeHandlerErrMes = mes;
+}
+
 /* PQnparams:
  *	returns the number of input parameters of a prepared statement.
  */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..205502b 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary)
 	if (conn->curTuple == NULL)
 	{
 		conn->curTuple = (PGresAttValue *)
-			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+			result->storeHandler(result, PQSF_ALLOC_BINARY, -1,
+								 nfields * sizeof(PGresAttValue));
 		if (conn->curTuple == NULL)
-			goto outOfMemory;
+			goto addTupleError;
 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
 
 		/*
@@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary)
 	{
 		bitmap = (char *) malloc(nbytes);
 		if (!bitmap)
-			goto outOfMemory;
+			goto addTupleError;
 	}
 
 	if (pqGetnchar(bitmap, nbytes, conn))
@@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary)
 				vlen = 0;
 			if (tup[i].value == NULL)
 			{
-				tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
+				PQStoreFunc func = 
+					(binary ? PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT);
+				tup[i].value =
+					(char *) result->storeHandler(result, func, i, vlen + 1);
 				if (tup[i].value == NULL)
-					goto outOfMemory;
+					goto addTupleError;
 			}
 			tup[i].len = vlen;
 			/* read in the value */
@@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary)
 	}
 
 	/* Success!  Store the completed tuple in the result */
-	if (!pqAddTuple(result, tup))
-		goto outOfMemory;
+	if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0))
+		goto addTupleError;
+
 	/* and reset for a new message */
 	conn->curTuple = NULL;
 
@@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary)
 		free(bitmap);
 	return 0;
 
-outOfMemory:
+addTupleError:
 	/* Replace partially constructed result with an error result */
 
 	/*
@@ -829,8 +834,21 @@ outOfMemory:
 	 * there's not enough memory to concatenate messages...
 	 */
 	pqClearAsyncResult(conn);
-	printfPQExpBuffer(&conn->errorMessage,
-					  libpq_gettext("out of memory for query result\n"));
+	resetPQExpBuffer(&conn->errorMessage);
+
+	/*
+	 * If error message is passed from addTupleFunc, set it into
+	 * PGconn, assume out of memory if not.
+	 */
+	appendPQExpBufferStr(&conn->errorMessage,
+						 libpq_gettext(result->storeHandlerErrMes ?
+									   result->storeHandlerErrMes :
+									   "out of memory for query result\n"));
+	if (result->storeHandlerErrMes)
+	{
+		free(result->storeHandlerErrMes);
+		result->storeHandlerErrMes = NULL;
+	}
 
 	/*
 	 * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..117c38a 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength)
 	if (conn->curTuple == NULL)
 	{
 		conn->curTuple = (PGresAttValue *)
-			pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
+			result->storeHandler(result, PQSF_ALLOC_BINARY, -1,
+								 nfields * sizeof(PGresAttValue));
 		if (conn->curTuple == NULL)
-			goto outOfMemory;
+			goto addTupleError;
 		MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
 	}
 	tup = conn->curTuple;
@@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength)
 			vlen = 0;
 		if (tup[i].value == NULL)
 		{
-			bool		isbinary = (result->attDescs[i].format != 0);
-
-			tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
+			PQStoreFunc func = (result->attDescs[i].format != 0 ?
+								PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT);
+			tup[i].value =
+				(char *) result->storeHandler(result, func, i, vlen + 1);
 			if (tup[i].value == NULL)
-				goto outOfMemory;
+				goto addTupleError;
 		}
 		tup[i].len = vlen;
 		/* read in the value */
@@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength)
 	}
 
 	/* Success!  Store the completed tuple in the result */
-	if (!pqAddTuple(result, tup))
-		goto outOfMemory;
+	if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0))
+		goto addTupleError;
+	
 	/* and reset for a new message */
 	conn->curTuple = NULL;
 
 	return 0;
 
-outOfMemory:
+addTupleError:
 
 	/*
 	 * Replace partially constructed result with an error result. First
 	 * discard the old result to try to win back some memory.
 	 */
 	pqClearAsyncResult(conn);
-	printfPQExpBuffer(&conn->errorMessage,
-					  libpq_gettext("out of memory for query result\n"));
+	resetPQExpBuffer(&conn->errorMessage);
+
+	/*
+	 * If error message is passed from addTupleFunc, set it into
+	 * PGconn, assume out of memory if not.
+	 */
+	appendPQExpBufferStr(&conn->errorMessage,
+						 libpq_gettext(result->storeHandlerErrMes ?
+									   result->storeHandlerErrMes : 
+									   "out of memory for query result\n"));
+	if (result->storeHandlerErrMes)
+	{
+		free(result->storeHandlerErrMes);
+		result->storeHandlerErrMes = NULL;
+	}
 	pqSaveErrorResult(conn);
 
 	/* Discard the failed message by pretending we read it */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..6d86fa0 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -116,6 +116,16 @@ typedef enum
 	PQPING_NO_ATTEMPT			/* connection not attempted (bad params) */
 } PGPing;
 
+/* PQStoreFunc is the enum for one of the parameters of storeHandler
+ * that decides what to do. See the typedef StoreHandler for
+ * details */
+typedef enum 
+{
+	PQSF_ALLOC_TEXT,          /* Requested non-aligned memory for text value */
+	PQSF_ALLOC_BINARY,        /* Requested aligned memory for binary value */
+	PQSF_ADD_TUPLE            /* Requested to add tuple data into store */
+} PQStoreFunc;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -149,6 +159,15 @@ typedef struct pgNotify
 	struct pgNotify *next;		/* list link */
 } PGnotify;
 
+/* PGresAttValue represents a value of one tuple field in string form.
+   NULL is represented as len < 0. Otherwise value points to a null
+   terminated C string with the length of len. */
+typedef struct pgresAttValue
+{
+	int			len;			/* length in bytes of the value */
+	char	   *value;			/* actual value, plus terminating zero byte */
+} PGresAttValue;
+
 /* Function types for notice-handling callbacks */
 typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
 typedef void (*PQnoticeProcessor) (void *arg, const char *message);
@@ -416,6 +435,52 @@ extern PGPing PQping(const char *conninfo);
 extern PGPing PQpingParams(const char *const * keywords,
 			 const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative result store handler.
+ *
+ * This function pointer is used for alternative result store handler
+ * callback in PGresult and PGconn.
+ *
+ * StoreHandler is called for three functions designated by the enum
+ * PQStoreFunc.
+ *
+ * id is the identifier for allocated memory block. The caller sets -1
+ * for PGresAttValue array, and 0 to number of cols - 1 for each
+ * column.
+ *
+ * PQSF_ALLOC_TEXT requests the size bytes memory block for a text
+ * value which may not be alingned to the word boundary.
+ *
+ * PQSF_ALLOC_BINARY requests the size bytes memory block for a binary
+ * value which is aligned to the word boundary.
+ *
+ * PQSF_ADD_TUPLE requests to add tuple data into the result store,
+ * and free the memory blocks allocated by this function if necessary.
+ * id and size are to be ignored for this function.
+ *
+ * This function must return non-NULL value for success and must
+ * return NULL for failure and may set error message by
+ * PQsetStoreHandlerErrMes. It is assumed by caller as out of memory
+ * when the error message is NULL on failure. This function is assumed
+ * not to throw any exception.
+ */
+typedef void *(*StoreHandler)(PGresult *res, PQStoreFunc func,
+							  int id, size_t size);
+
+/*
+ * Register alternative result store function to PGconn.
+ * 
+ * By registering this function, pg_result disables its own result
+ * store and calls it to append rows one by one.
+ *
+ * func is tuple store function. See the typedef StoreHandler.
+ * 
+ * storeHandlerParam is the contextual variable that can be get with
+ * PQgetStoreHandlerParam in StoreHandler.
+ */
+extern void PQregisterStoreHandler(PGconn *conn, StoreHandler func,
+								   void *storeHandlerParam);
+
 /* Force the write buffer to be written (or at least try) */
 extern int	PQflush(PGconn *conn);
 
@@ -454,6 +519,8 @@ extern char *PQcmdTuples(PGresult *res);
 extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num);
 extern int	PQgetlength(const PGresult *res, int tup_num, int field_num);
 extern int	PQgetisnull(const PGresult *res, int tup_num, int field_num);
+extern void *PQgetStoreHandlerParam(const PGresult *res);
+extern void	PQsetStoreHandlerErrMes(PGresult *res, char *mes);
 extern int	PQnparams(const PGresult *res);
 extern Oid	PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d967d60..e28e712 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -134,12 +134,6 @@ typedef struct pgresParamDesc
 
 #define NULL_LEN		(-1)	/* pg_result len for NULL value */
 
-typedef struct pgresAttValue
-{
-	int			len;			/* length in bytes of the value */
-	char	   *value;			/* actual value, plus terminating zero byte */
-} PGresAttValue;
-
 /* Typedef for message-field list entries */
 typedef struct pgMessageField
 {
@@ -209,6 +203,11 @@ struct pg_result
 	PGresult_data *curBlock;	/* most recently allocated block */
 	int			curOffset;		/* start offset of free space in block */
 	int			spaceLeft;		/* number of free bytes remaining in block */
+
+	StoreHandler storeHandler;  /* Result store handler. See
+								 * StoreHandler for details. */
+	void *storeHandlerParam;    /* Contextual parameter for storeHandler */
+	char *storeHandlerErrMes;   /* Error message from storeHandler */
 };
 
 /* PGAsyncStatusType defines the state of the query-execution state machine */
@@ -443,6 +442,13 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+    /* Tuple store handler. The two fields below is copied to newly
+	 * created PGresult if tupStoreHandler is not NULL. Use default
+	 * function if NULL. */
+	StoreHandler storeHandler;   /* Result store handler. See
+								  * StoreHandler for details. */
+	void *storeHandlerParam;  /* Contextual parameter for storeHandler */
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
@@ -507,7 +513,6 @@ extern void
 pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)
 /* This lets gcc check the format string for consistency. */
 __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
-extern int	pqAddTuple(PGresult *res, PGresAttValue *tup);
 extern void pqSaveMessageField(PGresult *res, char code,
 				   const char *value);
 extern void pqSaveParameterStatus(PGconn *conn, const char *name,
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..a8685a9 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,24 @@ typedef struct remoteConn
 	bool		newXactForCursor;		/* Opened a transaction for a cursor */
 } remoteConn;
 
+typedef struct storeInfo
+{
+	Tuplestorestate *tuplestore;
+	int nattrs;
+	AttInMetadata *attinmeta;
+	MemoryContext oldcontext;
+	char *attrvalbuf;
+	void **valbuf;
+	size_t *valbufsize;
+	bool error_occurred;
+	bool nummismatch;
+	ErrorData *edata;
+} storeInfo;
+
 /*
  * Internal declarations
  */
 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
@@ -90,6 +103,10 @@ static char *escape_param_str(const char *from);
 static void validate_pkattnums(Relation rel,
 				   int2vector *pkattnums_arg, int32 pknumatts_arg,
 				   int **pkattnums, int *pknumatts);
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static void *storeHandler(PGresult *res, PQStoreFunc func, int id, size_t size);
+
 
 /* Global */
 static remoteConn *pconn = NULL;
@@ -503,6 +520,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	char	   *curname = NULL;
 	int			howmany = 0;
 	bool		fail = true;	/* default to backward compatible */
+	storeInfo   storeinfo;
 
 	DBLINK_INIT;
 
@@ -559,15 +577,36 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
 
 	/*
+	 * Result is stored into storeinfo.tuplestore instead of
+	 * res->result retuned by PQexec below
+	 */
+	initStoreInfo(&storeinfo, fcinfo);
+	PQregisterStoreHandler(conn, storeHandler, &storeinfo);
+
+	/*
 	 * Try to execute the query.  Note that since libpq uses malloc, the
 	 * PGresult will be long-lived even though we are still in a short-lived
 	 * memory context.
 	 */
 	res = PQexec(conn, buf.data);
+	finishStoreInfo(&storeinfo);
+
 	if (!res ||
 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
 		 PQresultStatus(res) != PGRES_TUPLES_OK))
 	{
+		/* finishStoreInfo saves the fields referred to below. */
+		if (storeinfo.nummismatch)
+		{
+			/* This is only for backward compatibility */
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query result rowtype does not match "
+							"the specified FROM clause rowtype")));
+		}
+		else if (storeinfo.edata)
+			ReThrowError(storeinfo.edata);
+
 		dblink_res_error(conname, res, "could not fetch from cursor", fail);
 		return (Datum) 0;
 	}
@@ -579,8 +618,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
 				(errcode(ERRCODE_INVALID_CURSOR_NAME),
 				 errmsg("cursor \"%s\" does not exist", curname)));
 	}
+	PQclear(res);
 
-	materializeResult(fcinfo, res);
 	return (Datum) 0;
 }
 
@@ -640,6 +679,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible */
 	bool		freeconn = false;
+	storeInfo   storeinfo;
 
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -715,164 +755,213 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	rsinfo->setResult = NULL;
 	rsinfo->setDesc = NULL;
 
+
+	/*
+	 * Result is stored into storeinfo.tuplestore instead of
+	 * res->result retuned by PQexec/PQgetResult below
+	 */
+	initStoreInfo(&storeinfo, fcinfo);
+	PQregisterStoreHandler(conn, storeHandler, &storeinfo);
+
 	/* synchronous query, or async result retrieval */
 	if (!is_async)
 		res = PQexec(conn, sql);
 	else
-	{
 		res = PQgetResult(conn);
-		/* NULL means we're all done with the async results */
-		if (!res)
-			return (Datum) 0;
-	}
 
-	/* if needed, close the connection to the database and cleanup */
-	if (freeconn)
-		PQfinish(conn);
+	finishStoreInfo(&storeinfo);
 
-	if (!res ||
-		(PQresultStatus(res) != PGRES_COMMAND_OK &&
-		 PQresultStatus(res) != PGRES_TUPLES_OK))
+	/* NULL res from async get means we're all done with the results */
+	if (res || !is_async)
 	{
-		dblink_res_error(conname, res, "could not execute query", fail);
-		return (Datum) 0;
+		if (freeconn)
+			PQfinish(conn);
+
+		if (!res ||
+			(PQresultStatus(res) != PGRES_COMMAND_OK &&
+			 PQresultStatus(res) != PGRES_TUPLES_OK))
+		{
+			/* finishStoreInfo saves the fields referred to below. */
+			if (storeinfo.nummismatch)
+			{
+				/* This is only for backward compatibility */
+				ereport(ERROR,
+						(errcode(ERRCODE_DATATYPE_MISMATCH),
+						 errmsg("remote query result rowtype does not match "
+								"the specified FROM clause rowtype")));
+			}
+			else if (storeinfo.edata)
+				ReThrowError(storeinfo.edata);
+
+			dblink_res_error(conname, res, "could not execute query", fail);
+			return (Datum) 0;
+		}
 	}
+	PQclear(res);
 
-	materializeResult(fcinfo, res);
 	return (Datum) 0;
 }
 
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */
 static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo)
 {
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-
-	Assert(rsinfo->returnMode == SFRM_Materialize);
-
-	PG_TRY();
+	TupleDesc	tupdesc;
+	int i;
+	
+	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
 	{
-		TupleDesc	tupdesc;
-		bool		is_sql_cmd = false;
-		int			ntuples;
-		int			nfields;
+		case TYPEFUNC_COMPOSITE:
+			/* success */
+			break;
+		case TYPEFUNC_RECORD:
+			/* failed to determine actual type of RECORD */
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record")));
+			break;
+		default:
+			/* result type isn't composite */
+			elog(ERROR, "return type must be a row type");
+			break;
+	}
+	
+	sinfo->oldcontext = MemoryContextSwitchTo(
+		rsinfo->econtext->ecxt_per_query_memory);
+
+	/* make sure we have a persistent copy of the tupdesc */
+	tupdesc = CreateTupleDescCopy(tupdesc);
+
+	sinfo->error_occurred = FALSE;
+	sinfo->nummismatch = FALSE;
+	sinfo->edata = NULL;
+	sinfo->nattrs = tupdesc->natts;
+	sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+	sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+	sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *));
+	sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t));
+	for (i = 0 ; i < sinfo->nattrs ; i++)
+	{
+		sinfo->valbuf[i] = NULL;
+		sinfo->valbufsize[i] = 0;
+	}
 
-		if (PQresultStatus(res) == PGRES_COMMAND_OK)
-		{
-			is_sql_cmd = true;
-
-			/*
-			 * need a tuple descriptor representing one TEXT column to return
-			 * the command status string as our result tuple
-			 */
-			tupdesc = CreateTemplateTupleDesc(1, false);
-			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-							   TEXTOID, -1, 0);
-			ntuples = 1;
-			nfields = 1;
-		}
-		else
-		{
-			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+	/* Preallocate memory of same size with PGresAttDesc array for values. */
+	sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue));
 
-			is_sql_cmd = false;
+	rsinfo->setResult = sinfo->tuplestore;
+	rsinfo->setDesc = tupdesc;
+}
 
-			/* get a tuple descriptor for our result type */
-			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-			{
-				case TYPEFUNC_COMPOSITE:
-					/* success */
-					break;
-				case TYPEFUNC_RECORD:
-					/* failed to determine actual type of RECORD */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						errmsg("function returning record called in context "
-							   "that cannot accept type record")));
-					break;
-				default:
-					/* result type isn't composite */
-					elog(ERROR, "return type must be a row type");
-					break;
-			}
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+	int i;
 
-			/* make sure we have a persistent copy of the tupdesc */
-			tupdesc = CreateTupleDescCopy(tupdesc);
-			ntuples = PQntuples(res);
-			nfields = PQnfields(res);
+	for (i = 0 ; i < sinfo->nattrs ; i++)
+	{
+		if (sinfo->valbuf[i])
+		{
+			free(sinfo->valbuf[i]);
+			sinfo->valbuf[i] = NULL;
 		}
+	}
+	if (sinfo->attrvalbuf)
+		free(sinfo->attrvalbuf);
+	sinfo->attrvalbuf = NULL;
+	MemoryContextSwitchTo(sinfo->oldcontext);
+}
 
-		/*
-		 * check result and tuple descriptor have the same number of columns
-		 */
-		if (nfields != tupdesc->natts)
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("remote query result rowtype does not match "
-							"the specified FROM clause rowtype")));
+static void *
+storeHandler(PGresult *res, PQStoreFunc  func, int id, size_t size)
+{
+	storeInfo *sinfo = (storeInfo *)PQgetStoreHandlerParam(res);
+	HeapTuple	tuple;
+	int fields = PQnfields(res);
+	int i;
+	PGresAttValue *attval;
+	char        **cstrs;
 
-		if (ntuples > 0)
-		{
-			AttInMetadata *attinmeta;
-			Tuplestorestate *tupstore;
-			MemoryContext oldcontext;
-			int			row;
-			char	  **values;
-
-			attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-			oldcontext = MemoryContextSwitchTo(
-									rsinfo->econtext->ecxt_per_query_memory);
-			tupstore = tuplestore_begin_heap(true, false, work_mem);
-			rsinfo->setResult = tupstore;
-			rsinfo->setDesc = tupdesc;
-			MemoryContextSwitchTo(oldcontext);
+	if (sinfo->error_occurred)
+		return NULL;
+
+	switch (func)
+	{
+		case PQSF_ALLOC_TEXT:
+		case PQSF_ALLOC_BINARY:
+			if (id == -1)
+				return sinfo->attrvalbuf;
 
-			values = (char **) palloc(nfields * sizeof(char *));
+			if (id < 0 || id >= sinfo->nattrs)
+				return NULL;
 
-			/* put all tuples into the tuplestore */
-			for (row = 0; row < ntuples; row++)
+			if (sinfo->valbufsize[id] < size)
 			{
-				HeapTuple	tuple;
+				if (sinfo->valbuf[id] == NULL)
+					sinfo->valbuf[id] = malloc(size);
+				else
+					sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size);
+				sinfo->valbufsize[id] = size;
+			}
+			return sinfo->valbuf[id];
 
-				if (!is_sql_cmd)
-				{
-					int			i;
+		case PQSF_ADD_TUPLE:
+			break;   /* Go through */
+		default:
+			/* Ignore */
+			break;
+	}
 
-					for (i = 0; i < nfields; i++)
-					{
-						if (PQgetisnull(res, row, i))
-							values[i] = NULL;
-						else
-							values[i] = PQgetvalue(res, row, i);
-					}
-				}
-				else
-				{
-					values[0] = PQcmdStatus(res);
-				}
+	if (sinfo->nattrs != fields)
+	{
+		sinfo->error_occurred = TRUE;
+		sinfo->nummismatch = TRUE;
+		finishStoreInfo(sinfo);
 
-				/* build the tuple and put it into the tuplestore. */
-				tuple = BuildTupleFromCStrings(attinmeta, values);
-				tuplestore_puttuple(tupstore, tuple);
-			}
+		/* This error will be processed in
+		 * dblink_record_internal(). So do not set error message
+		 * here. */
+		return NULL;
+	}
 
-			/* clean up and return the tuplestore */
-			tuplestore_donestoring(tupstore);
-		}
+	/*
+	 * Rewrite PGresAttValue[] to char(*)[] in-place.
+	 */
+	Assert(sizeof(char*) <= sizeof(PGresAttValue));
 
-		PQclear(res);
+	attval = (PGresAttValue *)sinfo->attrvalbuf;
+	cstrs   = (char **)sinfo->attrvalbuf;
+	for(i = 0 ; i < fields ; i++)
+	{
+		if (attval->len < 0)
+			cstrs[i] = NULL;
+		else
+			cstrs[i] = attval->value;
+	}
+
+	PG_TRY();
+	{
+		tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+		tuplestore_puttuple(sinfo->tuplestore, tuple);
 	}
 	PG_CATCH();
 	{
-		/* be sure to release the libpq result */
-		PQclear(res);
-		PG_RE_THROW();
+		MemoryContext context;
+		/*
+		 * Store exception for later ReThrow and cancel the exception.
+		 */
+		sinfo->error_occurred = TRUE;
+		context = MemoryContextSwitchTo(sinfo->oldcontext);
+		sinfo->edata = CopyErrorData();
+		MemoryContextSwitchTo(context);
+		FlushErrorState();
+
+		return NULL;
 	}
 	PG_END_TRY();
+
+	return sinfo->attrvalbuf;
 }
 
 /*
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..8803999 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,293 @@ int PQisthreadsafe();
  </sect1>
 
 
+ <sect1 id="libpq-alterstorage">
+  <title>Alternative result storage</title>
+
+  <indexterm zone="libpq-alterstorage">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, users can get the result of command
+   execution from <structname>PGresult</structname> aquired
+   with <function>PGgetResult</function>
+   from <structname>PGConn</structname>. While the memory areas for
+   the PGresult are allocated with malloc() internally within calls of
+   command execution functions such as <function>PQexec</function>
+   and <function>PQgetResult</function>. If you have difficulties to
+   handle the result records in the form of PGresult, you can instruct
+   PGconn to store them into your own storage instead of PGresult.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-registerstorehandler">
+    <term>
+     <function>PQregisterStoreHandler</function>
+     <indexterm>
+      <primary>PQregisterStoreHandler</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to allocate memory for each tuple and
+       column values, and add the complete tuple into the alternative
+       result storage.
+<synopsis>
+void PQregisterStoreHandler(PGconn *conn,
+                            StoreHandler func,
+                            void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+	 <varlistentry>
+	   <term><parameter>conn</parameter></term>
+	   <listitem>
+	     <para>
+	       The connection object to set the storage handler
+	       function. PGresult created from this connection calls this
+	       function to store the result instead of storing into its
+	       internal storage.
+	     </para>
+	   </listitem>
+	 </varlistentry>
+	 <varlistentry>
+	   <term><parameter>func</parameter></term>
+	   <listitem>
+	     <para>
+	       Storage handler function to set. NULL means to use the
+	       default storage.
+	     </para>
+	   </listitem>
+	 </varlistentry>
+	 <varlistentry>
+	   <term><parameter>param</parameter></term>
+	   <listitem>
+	     <para>
+	       A pointer to contextual parameter passed
+	       to <parameter>func</parameter>. You can get this poiner
+	       in <type>StoreHandler</type>
+	       by <function>PQgetStoreHandlerParam</function>.
+	     </para>
+	   </listitem>
+	 </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-storehandler">
+    <term>
+     <type>Storehandler</type>
+     <indexterm>
+      <primary>StoreHandler</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the storage handler callback function.
+<synopsis>
+typedef enum 
+{
+  PQSF_ALLOC_TEXT,
+  PQSF_ALLOC_BINARY,
+  PQSF_ADD_TUPLE
+} PQStoreFunc;
+
+void *(*StoreHandler)(PGresult *res,
+                      PQStoreFunc func,
+                      int id,
+                      size_t size);
+</synopsis>
+     </para>
+
+     <para>
+       Generally this function must return NULL for failure and should
+       set the error message
+       with <function>PGsetStoreHandlerErrMes</function> if the cause
+       is other than out of memory. This funcion must not throw any
+       exception. This function is called in the sequence following.
+
+       <itemizedlist spacing="compact">
+	 <listitem>
+	   <simpara>Call with <parameter>func</parameter>
+	   = <firstterm>PQSF_ALLOC_BINARY</firstterm>
+	   and <parameter>id</parameter> = -1 to request the memory
+	   for a tuple to be used as an array
+	   of <type>PGresAttValue</type>. </simpara>
+	 </listitem>
+	 <listitem>
+	   <simpara>Call with <parameter>func</parameter>
+	   = <firstterm>PQSF_ALLOC_TEXT</firstterm>
+	   or <firstterm>PQSF_ALLOC_BINARY</firstterm>
+	   and <parameter>id</parameter> is zero to the number of columns
+	   - 1 to request the memory for each column value in current
+	   tuple.</simpara>
+	 </listitem>
+	 <listitem>
+	   <simpara>Call with <parameter>func</parameter>
+	   = <firstterm>PQSF_ADD_TUPLE</firstterm> to request the
+	   constructed tuple to be stored.</simpara>
+	 </listitem>
+       </itemizedlist>
+     </para>
+     <para>
+       Calling <type>StoreHandler</type>
+       with <parameter>func</parameter> =
+       <firstterm>PQSF_ALLOC_TEXT</firstterm> is telling to return a
+        memory block with at least <parameter>size</parameter> bytes
+        which may not be aligned to the word boundary.
+       <parameter>id</parameter> is a zero or positive number
+       distinguishes the usage of requested memory block, that is the
+       position of the column for which the memory block is used.
+     </para>
+     <para>
+       When <parameter>func</parameter>
+       = <firstterm>PQSF_ALLOC_BINARY</firstterm>, this function is
+       telled to return a memory block with at
+       least <parameter>size</parameter> bytes which is aligned to the
+       word boundary.
+       <parameter>id</parameter> is the identifier distinguishes the
+       usage of requested memory block. -1 means that it is used as an
+       array of <type>PGresAttValue</type> to store the tuple. Zero or
+       positive numbers have the same meanings as for
+       <firstterm>PQSF_ALLOC_BINARY</firstterm>.
+     </para>
+     <para>When <parameter>func</parameter>
+       = <firstterm>PQSF_ADD_TUPLE</firstterm>, this function is
+       telled to store the <type>PGresAttValue</type> structure
+       constructed by the caller into your storage. The pointer to the
+       tuple structure is not passed so you should memorize the
+       pointer to the memory block passed back the caller on
+       <parameter>func</parameter>
+       = <parameter>PQSF_ALLOC_BINARY</parameter>
+       with <parameter>id</parameter> is -1. This function must return
+       any non-NULL values for success. You must properly put back the
+       memory blocks passed to the caller in this function if needed.
+     </para>
+     <variablelist>
+       <varlistentry>
+	 <term><parameter>res</parameter></term>
+	 <listitem>
+	   <para>
+	     A pointer to the <type>PGresult</type> object.
+	   </para>
+	 </listitem>
+       </varlistentry>
+       <varlistentry>
+	 <term><parameter>func</parameter></term>
+	 <listitem>
+	   <para>
+	     An <type>enum</type> value telling the function to perform.
+	   </para>
+	 </listitem>
+       </varlistentry>
+       <varlistentry>
+	 <term><parameter>param</parameter></term>
+	 <listitem>
+	   <para>
+	     A pointer to contextual parameter passed to func.
+	   </para>
+	 </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqgetstorehandlerparam">
+    <term>
+     <function>PQgetStoreHandlerParam</function>
+     <indexterm>
+      <primary>PQgetStoreHandlerParam</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+	Get the pointer passed to <function>PQregisterStoreHandler</function>
+	as <parameter>param</parameter>.
+<synopsis>
+void *PQgetStoreHandlerParam(PGresult *res)
+</synopsis>
+      </para>
+      <para>
+	<variablelist>
+	  <varlistentry>
+	    <term><parameter>res</parameter></term>
+	    <listitem>
+	      <para>
+		A pointer to the <type>PGresult</type> object.
+	      </para>
+	    </listitem>
+	  </varlistentry>
+	</variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetstorehandlererrmes">
+    <term>
+     <function>PQsetStoreHandlerErrMes</function>
+     <indexterm>
+      <primary>PQsetStoreHandlerErrMes</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+	Set the message for the error occurred
+	in <type>StoreHandler</type>.  If this message is not set, the
+	caller assumes the error to be out of memory.
+<synopsis>
+void PQsetStoreHandlerErrMes(PGresult *res, char *mes)
+</synopsis>
+      </para>
+      <para>
+	<variablelist>
+	  <varlistentry>
+	    <term><parameter>res</parameter></term>
+	    <listitem>
+	      <para>
+		A pointer to the <type>PGresult</type> object
+		passed to <type>StoreHandler</type>.
+	      </para>
+	    </listitem>
+	  </varlistentry>
+	  <varlistentry>
+	    <term><parameter>mes</parameter></term>
+	    <listitem>
+	      <para>
+		A pointer to the memory block containing the error
+		message, which is allocated
+		by <function>malloc()</function>. The memory block
+		will be freed with <function>free()</function> in the
+		caller of
+		<type>StoreHandler</type> only if it returns NULL.
+	      </para>
+	      <para>
+		If <parameter>res</parameter> already has a message previously
+		set, it is freed and then the given message is set. Set NULL
+		to cancel the the costom message.
+	      </para>
+	    </listitem>
+	  </varlistentry>
+	</variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </sect1>
+
+
  <sect1 id="libpq-build">
   <title>Building <application>libpq</application> Programs</title>
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to