Hello, as the discuttion on async fetching on postgres_fdw, FETCH
with data-size limitation would be useful to get memory usage
stability of postgres_fdw.

Is such a feature and syntax could be allowed to be added?

== 

Postgres_fdw fetches tuples from remote servers using cursor. The
transfer gets faster as the number of fetch decreases. On the
other hand buffer size for the fetched tuples widely varies
according to their average length. 100 tuples per fetch is quite
small for short tuples but larger fetch size will easily cause
memory exhaustion. However, there's no way to know it in advance.

One means to settle the contradiction would be a FETCH which
sends result limiting by size, not the number of tuples. So I'd
like to propose this.
 

This patch is a POC for the feature. For exapmle,

  FETCH 10000 LIMIT 1000000 FROM c1;

This FETCH retrieves up to 10000 tuples but cut out just after
the total tuple length exceeds 1MB. (It does not literally
"LIMIT" in that sense)

The syntax added by this patch is described as following.

 FETCH [FORWARD|BACKWARD] <ALL|SignedIconst> LIMIT Iconst [FROM|IN] curname

The "data size" to be compared with the LIMIT size is the
summation of the result of the following expression. The
appropriateness of it should be arguable.

[if tupleslot has tts_tuple]
   HEAPTUPLESIZE + slot->tts_tuple->t_len
[else]
   HEAPTUPLESIZE + 
     heap_compute_data_size(slot->tts_tupleDescriptor,
                            slot->tts_values,
                            slot->tts_isnull);

========================

This patch does following changes,

- This patch adds the parameter "size" to following functions
      (standard_)ExecutorRun / ExecutePlan / RunFromStore
      PortalRun / PortalRunSelect / PortalRunFetch / DoPortalRunFetch

- The core is in StandardExecutorRun and RunFromStore. Simplly
  sum up the sent tuple length and compare against the given
  limit.

- struct FetchStmt and EState has new member.

- The modifications in gram.y affects on ecpg parser. I think I
  could fix them but with no confidence :(

- Modified the corespondence parts of the changes above in
  auto_explain and pg_stat_statments only in parameter list.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 6f1dd6998ba312c3552f137365e3a3118b7935be Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 21 Jan 2015 17:18:09 +0900
Subject: [PATCH] Size limitation feature of FETCH v0

---
 contrib/auto_explain/auto_explain.c             |  8 +--
 contrib/pg_stat_statements/pg_stat_statements.c |  8 +--
 src/backend/commands/copy.c                     |  2 +-
 src/backend/commands/createas.c                 |  2 +-
 src/backend/commands/explain.c                  |  2 +-
 src/backend/commands/extension.c                |  2 +-
 src/backend/commands/matview.c                  |  2 +-
 src/backend/commands/portalcmds.c               |  3 +-
 src/backend/commands/prepare.c                  |  2 +-
 src/backend/executor/execMain.c                 | 35 +++++++--
 src/backend/executor/execUtils.c                |  1 +
 src/backend/executor/functions.c                |  2 +-
 src/backend/executor/spi.c                      |  4 +-
 src/backend/parser/gram.y                       | 59 +++++++++++++++
 src/backend/tcop/postgres.c                     |  2 +
 src/backend/tcop/pquery.c                       | 95 +++++++++++++++++--------
 src/include/executor/executor.h                 |  8 +--
 src/include/nodes/execnodes.h                   |  1 +
 src/include/nodes/parsenodes.h                  |  1 +
 src/include/tcop/pquery.h                       |  3 +-
 src/interfaces/ecpg/preproc/Makefile            |  2 +-
 src/interfaces/ecpg/preproc/ecpg.addons         | 63 ++++++++++++++++
 22 files changed, 248 insertions(+), 59 deletions(-)

diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c
index 2a184ed..f121a33 100644
--- a/contrib/auto_explain/auto_explain.c
+++ b/contrib/auto_explain/auto_explain.c
@@ -57,7 +57,7 @@ void		_PG_fini(void);
 static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void explain_ExecutorRun(QueryDesc *queryDesc,
 					ScanDirection direction,
-					long count);
+					long count, long size);
 static void explain_ExecutorFinish(QueryDesc *queryDesc);
 static void explain_ExecutorEnd(QueryDesc *queryDesc);
 
@@ -232,15 +232,15 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nesting_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nesting_level--;
 	}
 	PG_CATCH();
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 2629bfc..a68c11d 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -282,7 +282,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query);
 static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
 static void pgss_ExecutorRun(QueryDesc *queryDesc,
 				 ScanDirection direction,
-				 long count);
+				 long count, long size);
 static void pgss_ExecutorFinish(QueryDesc *queryDesc);
 static void pgss_ExecutorEnd(QueryDesc *queryDesc);
 static void pgss_ProcessUtility(Node *parsetree, const char *queryString,
@@ -863,15 +863,15 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
  * ExecutorRun hook: all we need do is track nesting depth
  */
 static void
-pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size)
 {
 	nested_level++;
 	PG_TRY();
 	{
 		if (prev_ExecutorRun)
-			prev_ExecutorRun(queryDesc, direction, count);
+			prev_ExecutorRun(queryDesc, direction, count, size);
 		else
-			standard_ExecutorRun(queryDesc, direction, count);
+			standard_ExecutorRun(queryDesc, direction, count, size);
 		nested_level--;
 	}
 	PG_CATCH();
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 0e604b7..b6e6523 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1915,7 +1915,7 @@ CopyTo(CopyState cstate)
 	else
 	{
 		/* run the plan --- the dest receiver will send tuples */
-		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, 0L);
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
 
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index abc0fe8..c5c4478 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -192,7 +192,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
 		dir = ForwardScanDirection;
 
 	/* run the plan */
-	ExecutorRun(queryDesc, dir, 0L);
+	ExecutorRun(queryDesc, dir, 0L, 0L);
 
 	/* save the rowcount if we're given a completionTag to fill */
 	if (completionTag)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 7cfc9bb..2c23e9b 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -489,7 +489,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 			dir = ForwardScanDirection;
 
 		/* run the plan */
-		ExecutorRun(queryDesc, dir, 0L);
+		ExecutorRun(queryDesc, dir, 0L, 0L);
 
 		/* run cleanup too */
 		ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c
index 3b95552..f624567 100644
--- a/src/backend/commands/extension.c
+++ b/src/backend/commands/extension.c
@@ -736,7 +736,7 @@ execute_sql_string(const char *sql, const char *filename)
 										dest, NULL, 0);
 
 				ExecutorStart(qdesc, 0);
-				ExecutorRun(qdesc, ForwardScanDirection, 0);
+				ExecutorRun(qdesc, ForwardScanDirection, 0, 0);
 				ExecutorFinish(qdesc);
 				ExecutorEnd(qdesc);
 
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 74415b8..6530ecb 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -360,7 +360,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
 
 	/* run the plan */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 	/* and clean up */
 	ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 2794537..255c86e 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -177,6 +177,7 @@ PerformPortalFetch(FetchStmt *stmt,
 	nprocessed = PortalRunFetch(portal,
 								stmt->direction,
 								stmt->howMany,
+								stmt->howLarge,
 								dest);
 
 	/* Return command status if wanted */
@@ -375,7 +376,7 @@ PersistHoldablePortal(Portal portal)
 										true);
 
 		/* Fetch the result set into the tuplestore */
-		ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+		ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 		(*queryDesc->dest->rDestroy) (queryDesc->dest);
 		queryDesc->dest = NULL;
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index 71b08f0..31799f5 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -291,7 +291,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
 	 */
 	PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
 
-	(void) PortalRun(portal, count, false, dest, dest, completionTag);
+	(void) PortalRun(portal, count, 0L, false, dest, dest, completionTag);
 
 	PortalDrop(portal, false);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index b9f21c5..9cecc1d 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -78,6 +78,7 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
 			ScanDirection direction,
 			DestReceiver *dest);
 static bool ExecCheckRTEPerms(RangeTblEntry *rte);
@@ -248,17 +249,17 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
  */
 void
 ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count)
+			ScanDirection direction, long count, long size)
 {
 	if (ExecutorRun_hook)
-		(*ExecutorRun_hook) (queryDesc, direction, count);
+		(*ExecutorRun_hook) (queryDesc, direction, count, size);
 	else
-		standard_ExecutorRun(queryDesc, direction, count);
+		standard_ExecutorRun(queryDesc, direction, count, size);
 }
 
 void
 standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count)
+					 ScanDirection direction, long count, long size)
 {
 	EState	   *estate;
 	CmdType		operation;
@@ -310,6 +311,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 					operation,
 					sendTuples,
 					count,
+					size,
 					direction,
 					dest);
 
@@ -1450,22 +1452,26 @@ ExecutePlan(EState *estate,
 			CmdType operation,
 			bool sendTuples,
 			long numberTuples,
+			long sizeTuples,
 			ScanDirection direction,
 			DestReceiver *dest)
 {
 	TupleTableSlot *slot;
 	long		current_tuple_count;
+	long		sent_size;
 
 	/*
 	 * initialize local variables
 	 */
 	current_tuple_count = 0;
-
+	sent_size = 0;
 	/*
 	 * Set the direction.
 	 */
 	estate->es_direction = direction;
 
+	estate->es_stoppedbysize = false;
+
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
 	 */
@@ -1520,6 +1526,25 @@ ExecutePlan(EState *estate,
 		current_tuple_count++;
 		if (numberTuples && numberTuples == current_tuple_count)
 			break;
+
+		/* Count the size of tuples we've sent */
+		if (slot->tts_tuple)
+			sent_size += HEAPTUPLESIZE + slot->tts_tuple->t_len;
+		else
+		{
+			sent_size += HEAPTUPLESIZE +
+				heap_compute_data_size(slot->tts_tupleDescriptor,
+									   slot->tts_values,
+									   slot->tts_isnull);
+		}
+
+		/* Quit when the size limit will be exceeded by this tuple */
+		if (sizeTuples > 0 && sizeTuples < sent_size)
+		{
+			estate->es_stoppedbysize = true;
+			break;
+		}
+
 	}
 }
 
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 32697dd..ff2c395 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -133,6 +133,7 @@ CreateExecutorState(void)
 	estate->es_rowMarks = NIL;
 
 	estate->es_processed = 0;
+	estate->es_stoppedbysize = false;
 	estate->es_lastoid = InvalidOid;
 
 	estate->es_top_eflags = 0;
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 84be37c..d64e908 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -850,7 +850,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
 		/* Run regular commands to completion unless lazyEval */
 		long		count = (es->lazyEval) ? 1L : 0L;
 
-		ExecutorRun(es->qd, ForwardScanDirection, count);
+		ExecutorRun(es->qd, ForwardScanDirection, count, 0L);
 
 		/*
 		 * If we requested run to completion OR there was no tuple returned,
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 4b86e91..cb30cfb 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -2369,7 +2369,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount)
 
 	ExecutorStart(queryDesc, eflags);
 
-	ExecutorRun(queryDesc, ForwardScanDirection, tcount);
+	ExecutorRun(queryDesc, ForwardScanDirection, tcount, 0L);
 
 	_SPI_current->processed = queryDesc->estate->es_processed;
 	_SPI_current->lastoid = queryDesc->estate->es_lastoid;
@@ -2447,7 +2447,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 	/* Run the cursor */
 	nfetched = PortalRunFetch(portal,
 							  direction,
-							  count,
+							  count, 0L,
 							  dest);
 
 	/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 36dac29..3a139ed 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -520,6 +520,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <str>		opt_existing_window_name
 %type <boolean> opt_if_not_exists
 
+%type <ival>	opt_fetch_limit
+
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
  * They must be listed first so that their numeric codes do not depend on
@@ -6021,6 +6023,15 @@ fetch_args:	cursor_name
 					n->howMany = $1;
 					$$ = (Node *)n;
 				}
+			| SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $5;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $1;
+					n->howLarge = $3;
+					$$ = (Node *)n;
+				}
 			| ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6029,6 +6040,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $5;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $3;
+					$$ = (Node *)n;
+				}
 			| FORWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6045,6 +6065,15 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| FORWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| FORWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6053,6 +6082,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| FORWARD ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_FORWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| BACKWARD opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6069,6 +6107,15 @@ fetch_args:	cursor_name
 					n->howMany = $2;
 					$$ = (Node *)n;
 				}
+			| BACKWARD SignedIconst LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = $2;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 			| BACKWARD ALL opt_from_in cursor_name
 				{
 					FetchStmt *n = makeNode(FetchStmt);
@@ -6077,6 +6124,15 @@ fetch_args:	cursor_name
 					n->howMany = FETCH_ALL;
 					$$ = (Node *)n;
 				}
+			| BACKWARD ALL LIMIT Iconst opt_from_in cursor_name
+				{
+					FetchStmt *n = makeNode(FetchStmt);
+					n->portalname = $6;
+					n->direction = FETCH_BACKWARD;
+					n->howMany = FETCH_ALL;
+					n->howLarge = $4;
+					$$ = (Node *)n;
+				}
 		;
 
 from_in:	FROM									{}
@@ -6087,6 +6143,9 @@ opt_from_in:	from_in								{}
 			| /* EMPTY */							{}
 		;
 
+opt_fetch_limit:	LIMIT Iconst					{ $$ = $2;}
+			| /* EMPTY */							{ $$ = 0; }
+		;
 
 /*****************************************************************************
  *
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 8f74353..55f062b 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1043,6 +1043,7 @@ exec_simple_query(const char *query_string)
 		 */
 		(void) PortalRun(portal,
 						 FETCH_ALL,
+						 0,
 						 isTopLevel,
 						 receiver,
 						 receiver,
@@ -1928,6 +1929,7 @@ exec_execute_message(const char *portal_name, long max_rows)
 
 	completed = PortalRun(portal,
 						  max_rows,
+						  0,
 						  true, /* always top level */
 						  receiver,
 						  receiver,
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 9c14e8a..5f68cc7 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "access/xact.h"
+#include "access/htup_details.h"
 #include "commands/prepare.h"
 #include "executor/tstoreReceiver.h"
 #include "miscadmin.h"
@@ -39,9 +40,10 @@ static void ProcessQuery(PlannedStmt *plan,
 			 DestReceiver *dest,
 			 char *completionTag);
 static void FillPortalStore(Portal portal, bool isTopLevel);
-static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
+static uint32 RunFromStore(Portal portal, ScanDirection direction,
+		     long count, long size, bool *stoppedbysize,
 			 DestReceiver *dest);
-static long PortalRunSelect(Portal portal, bool forward, long count,
+static long PortalRunSelect(Portal portal, bool forward, long count, long size,
 				DestReceiver *dest);
 static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
 				 DestReceiver *dest, char *completionTag);
@@ -51,6 +53,7 @@ static void PortalRunMulti(Portal portal, bool isTopLevel,
 static long DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
 				 DestReceiver *dest);
 static void DoPortalRewind(Portal portal);
 
@@ -182,7 +185,7 @@ ProcessQuery(PlannedStmt *plan,
 	/*
 	 * Run the plan to completion.
 	 */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L);
 
 	/*
 	 * Build command completion status string, if caller wants one.
@@ -703,7 +706,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
  * suspended due to exhaustion of the count parameter.
  */
 bool
-PortalRun(Portal portal, long count, bool isTopLevel,
+PortalRun(Portal portal, long count, long size, bool isTopLevel,
 		  DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag)
 {
@@ -787,7 +790,7 @@ PortalRun(Portal portal, long count, bool isTopLevel,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				nprocessed = PortalRunSelect(portal, true, count, dest);
+				nprocessed = PortalRunSelect(portal, true, count, size, dest);
 
 				/*
 				 * If the portal result contains a command tag and the caller
@@ -892,11 +895,13 @@ static long
 PortalRunSelect(Portal portal,
 				bool forward,
 				long count,
+				long size,
 				DestReceiver *dest)
 {
 	QueryDesc  *queryDesc;
 	ScanDirection direction;
 	uint32		nprocessed;
+	bool		stoppedbysize;
 
 	/*
 	 * NB: queryDesc will be NULL if we are fetching from a held cursor or a
@@ -939,12 +944,14 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction, count,
+									  size, &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -954,8 +961,9 @@ PortalRunSelect(Portal portal,
 
 			if (nprocessed > 0)
 				portal->atStart = false;		/* OK to go backward now */
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 				portal->atEnd = true;	/* we retrieved 'em all */
 			oldPos = portal->portalPos;
 			portal->portalPos += nprocessed;
@@ -982,12 +990,14 @@ PortalRunSelect(Portal portal,
 			count = 0;
 
 		if (portal->holdStore)
-			nprocessed = RunFromStore(portal, direction, count, dest);
+			nprocessed = RunFromStore(portal, direction, count,
+									  size, &stoppedbysize, dest);
 		else
 		{
 			PushActiveSnapshot(queryDesc->snapshot);
-			ExecutorRun(queryDesc, direction, count);
+			ExecutorRun(queryDesc, direction, count, size);
 			nprocessed = queryDesc->estate->es_processed;
+			stoppedbysize = queryDesc->estate->es_stoppedbysize;
 			PopActiveSnapshot();
 		}
 
@@ -998,8 +1008,9 @@ PortalRunSelect(Portal portal,
 				portal->atEnd = false;	/* OK to go forward now */
 				portal->portalPos++;	/* adjust for endpoint case */
 			}
-			if (count == 0 ||
-				(unsigned long) nprocessed < (unsigned long) count)
+			if ((count == 0 ||
+				 (unsigned long) nprocessed < (unsigned long) count) &&
+				!stoppedbysize)
 			{
 				portal->atStart = true; /* we retrieved 'em all */
 				portal->portalPos = 0;
@@ -1089,10 +1100,13 @@ FillPortalStore(Portal portal, bool isTopLevel)
  */
 static uint32
 RunFromStore(Portal portal, ScanDirection direction, long count,
-			 DestReceiver *dest)
+			 long size_limit, bool *stoppedbysize, DestReceiver *dest)
 {
 	long		current_tuple_count = 0;
 	TupleTableSlot *slot;
+	long			sent_size = 0;
+
+	*stoppedbysize = false;
 
 	slot = MakeSingleTupleTableSlot(portal->tupDesc);
 
@@ -1133,6 +1147,25 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 			current_tuple_count++;
 			if (count && count == current_tuple_count)
 				break;
+
+			/* Count the size of tuples we've sent */
+			if (slot->tts_tuple)
+				sent_size += HEAPTUPLESIZE + slot->tts_tuple->t_len;
+			else
+			{
+				sent_size += HEAPTUPLESIZE +
+					heap_compute_data_size(slot->tts_tupleDescriptor,
+										   slot->tts_values,
+										   slot->tts_isnull);
+			}
+
+			/* Quit when the size limit will be exceeded by this tuple */
+			if (current_tuple_count > 0 &&
+				size_limit > 0 && size_limit < sent_size)
+			{
+				*stoppedbysize = true;
+				break;
+			}
 		}
 	}
 
@@ -1385,6 +1418,7 @@ long
 PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
 			   DestReceiver *dest)
 {
 	long		result;
@@ -1422,7 +1456,7 @@ PortalRunFetch(Portal portal,
 		switch (portal->strategy)
 		{
 			case PORTAL_ONE_SELECT:
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
 				break;
 
 			case PORTAL_ONE_RETURNING:
@@ -1439,7 +1473,7 @@ PortalRunFetch(Portal portal,
 				/*
 				 * Now fetch desired portion of results.
 				 */
-				result = DoPortalRunFetch(portal, fdirection, count, dest);
+				result = DoPortalRunFetch(portal, fdirection, count, size, dest);
 				break;
 
 			default:
@@ -1484,6 +1518,7 @@ static long
 DoPortalRunFetch(Portal portal,
 				 FetchDirection fdirection,
 				 long count,
+				 long size,
 				 DestReceiver *dest)
 {
 	bool		forward;
@@ -1526,7 +1561,7 @@ DoPortalRunFetch(Portal portal,
 				{
 					DoPortalRewind(portal);
 					if (count > 1)
-						PortalRunSelect(portal, true, count - 1,
+						PortalRunSelect(portal, true, count - 1, 0L,
 										None_Receiver);
 				}
 				else
@@ -1536,13 +1571,13 @@ DoPortalRunFetch(Portal portal,
 					if (portal->atEnd)
 						pos++;	/* need one extra fetch if off end */
 					if (count <= pos)
-						PortalRunSelect(portal, false, pos - count + 1,
+						PortalRunSelect(portal, false, pos - count + 1, 0L,
 										None_Receiver);
 					else if (count > pos + 1)
-						PortalRunSelect(portal, true, count - pos - 1,
+						PortalRunSelect(portal, true, count - pos - 1, 0L,
 										None_Receiver);
 				}
-				return PortalRunSelect(portal, true, 1L, dest);
+				return PortalRunSelect(portal, true, 1L, 0L, dest);
 			}
 			else if (count < 0)
 			{
@@ -1553,17 +1588,17 @@ DoPortalRunFetch(Portal portal,
 				 * (Is it worth considering case where count > half of size of
 				 * query?  We could rewind once we know the size ...)
 				 */
-				PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
+				PortalRunSelect(portal, true, FETCH_ALL, 0L, None_Receiver);
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false, -count - 1, 0, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, dest);
 			}
 			else
 			{
 				/* count == 0 */
 				/* Rewind to start, return zero rows */
 				DoPortalRewind(portal);
-				return PortalRunSelect(portal, true, 0L, dest);
+				return PortalRunSelect(portal, true, 0L, 0L, dest);
 			}
 			break;
 		case FETCH_RELATIVE:
@@ -1573,8 +1608,8 @@ DoPortalRunFetch(Portal portal,
 				 * Definition: advance count-1 rows, return next row (if any).
 				 */
 				if (count > 1)
-					PortalRunSelect(portal, true, count - 1, None_Receiver);
-				return PortalRunSelect(portal, true, 1L, dest);
+					PortalRunSelect(portal, true, count - 1, 0L, None_Receiver);
+				return PortalRunSelect(portal, true, 1L, 0L, dest);
 			}
 			else if (count < 0)
 			{
@@ -1583,8 +1618,8 @@ DoPortalRunFetch(Portal portal,
 				 * any).
 				 */
 				if (count < -1)
-					PortalRunSelect(portal, false, -count - 1, None_Receiver);
-				return PortalRunSelect(portal, false, 1L, dest);
+					PortalRunSelect(portal, false, -count - 1, 0L, None_Receiver);
+				return PortalRunSelect(portal, false, 1L, 0L, dest);
 			}
 			else
 			{
@@ -1630,7 +1665,7 @@ DoPortalRunFetch(Portal portal,
 			 */
 			if (on_row)
 			{
-				PortalRunSelect(portal, false, 1L, None_Receiver);
+				PortalRunSelect(portal, false, 1L, 0L, None_Receiver);
 				/* Set up to fetch one row forward */
 				count = 1;
 				forward = true;
@@ -1652,7 +1687,7 @@ DoPortalRunFetch(Portal portal,
 		return result;
 	}
 
-	return PortalRunSelect(portal, forward, count, dest);
+	return PortalRunSelect(portal, forward, count, size, dest);
 }
 
 /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 40fde83..64a02c3 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -80,8 +80,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
 
 /* Hook for plugins to get control in ExecutorRun() */
 typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc,
-												   ScanDirection direction,
-												   long count);
+									   ScanDirection direction,
+									   long count, long size);
 extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
 
 /* Hook for plugins to get control in ExecutorFinish() */
@@ -176,9 +176,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter,
 extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
 extern void ExecutorRun(QueryDesc *queryDesc,
-			ScanDirection direction, long count);
+			ScanDirection direction, long count, long size);
 extern void standard_ExecutorRun(QueryDesc *queryDesc,
-					 ScanDirection direction, long count);
+		    ScanDirection direction, long count, long size);
 extern void ExecutorFinish(QueryDesc *queryDesc);
 extern void standard_ExecutorFinish(QueryDesc *queryDesc);
 extern void ExecutorEnd(QueryDesc *queryDesc);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 41288ed..d963286 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -376,6 +376,7 @@ typedef struct EState
 	List	   *es_rowMarks;	/* List of ExecRowMarks */
 
 	uint32		es_processed;	/* # of tuples processed */
+	bool		es_stoppedbysize; /* true if processing stopped by size */
 	Oid			es_lastoid;		/* last oid processed (by INSERT) */
 
 	int			es_top_eflags;	/* eflags passed to ExecutorStart */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b1dfa85..9e18331 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2223,6 +2223,7 @@ typedef struct FetchStmt
 	NodeTag		type;
 	FetchDirection direction;	/* see above */
 	long		howMany;		/* number of rows, or position argument */
+	long		howLarge;		/* total bytes of rows */
 	char	   *portalname;		/* name of portal (cursor) */
 	bool		ismove;			/* TRUE if MOVE */
 } FetchStmt;
diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h
index 8073a6e..afffe86 100644
--- a/src/include/tcop/pquery.h
+++ b/src/include/tcop/pquery.h
@@ -33,13 +33,14 @@ extern void PortalStart(Portal portal, ParamListInfo params,
 extern void PortalSetResultFormat(Portal portal, int nFormats,
 					  int16 *formats);
 
-extern bool PortalRun(Portal portal, long count, bool isTopLevel,
+extern bool PortalRun(Portal portal, long count, long size, bool isTopLevel,
 		  DestReceiver *dest, DestReceiver *altdest,
 		  char *completionTag);
 
 extern long PortalRunFetch(Portal portal,
 			   FetchDirection fdirection,
 			   long count,
+			   long size,
 			   DestReceiver *dest);
 
 #endif   /* PQUERY_H */
diff --git a/src/interfaces/ecpg/preproc/Makefile b/src/interfaces/ecpg/preproc/Makefile
index 1ecc405..b492fa7 100644
--- a/src/interfaces/ecpg/preproc/Makefile
+++ b/src/interfaces/ecpg/preproc/Makefile
@@ -48,7 +48,7 @@ ecpg: $(OBJS) | submake-libpgport
 preproc.o: pgc.c
 
 preproc.h: preproc.c ;
-preproc.c: BISONFLAGS += -d
+preproc.c: BISONFLAGS += -r all -d
 
 preproc.y: ../../../backend/parser/gram.y parse.pl ecpg.addons ecpg.header ecpg.tokens ecpg.trailer ecpg.type
 	$(PERL) $(srcdir)/parse.pl $(srcdir) < $< > $@
diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons
index b3b36cf..bdccb68 100644
--- a/src/interfaces/ecpg/preproc/ecpg.addons
+++ b/src/interfaces/ecpg/preproc/ecpg.addons
@@ -220,13 +220,46 @@ ECPG: fetch_argsNEXTopt_from_incursor_name addon
 ECPG: fetch_argsPRIORopt_from_incursor_name addon
 ECPG: fetch_argsFIRST_Popt_from_incursor_name addon
 ECPG: fetch_argsLAST_Popt_from_incursor_name addon
+		add_additional_variables($3, false);
+		if ($3[0] == ':')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
 ECPG: fetch_argsALLopt_from_incursor_name addon
+ECPG: fetch_argsFORWARDopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
 		{
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsALLLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($5, false);
+		if ($5[0] == ':')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDALLLIMITIconstopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDALLLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
 ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 		add_additional_variables($3, false);
 		if ($3[0] == ':')
@@ -234,11 +267,41 @@ ECPG: fetch_argsSignedIconstopt_from_incursor_name addon
 			free($3);
 			$3 = mm_strdup("$0");
 		}
+ECPG: fetch_argsSignedIconstLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($5, false);
+		if ($5[0] == ':')
+		{
+			free($5);
+			$5 = mm_strdup("$0");
+		}
 		if ($1[0] == '$')
 		{
 			free($1);
 			$1 = mm_strdup("$0");
 		}
+		if ($3[0] == '$')
+		{
+			free($3);
+			$3 = mm_strdup("$0");
+		}
+ECPG: fetch_argsFORWARDSignedIconstLIMITIconstopt_from_incursor_name addon
+ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_from_incursor_name addon
+		add_additional_variables($6, false);
+		if ($6[0] == ':')
+		{
+			free($6);
+			$6 = mm_strdup("$0");
+		}
+		if ($2[0] == '$')
+		{
+			free($2);
+			$2 = mm_strdup("$0");
+		}
+		if ($4[0] == '$')
+		{
+			free($4);
+			$4 = mm_strdup("$0");
+		}
 ECPG: fetch_argsFORWARDALLopt_from_incursor_name addon
 ECPG: fetch_argsBACKWARDALLopt_from_incursor_name addon
 		add_additional_variables($4, false);
-- 
2.1.0.GIT

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to