On Fri, Sep 4, 2015 at 2:45 AM, Kyotaro HORIGUCHI <
horiguchi.kyot...@lab.ntt.co.jp> wrote:

> Hello,
>
> > > > @@ -915,6 +933,23 @@ postgresBeginForeignScan(ForeignScanState *node,
> > > int eflags)
> ...
> > > > +     def = get_option(table->options, "fetch_size");
> >
> > > I don't think it's a good idea to make such checks at runtime - and
> > > either way it's somethign that should be reported back using an
> > > ereport(), not an elog.
> >
> > > Also, it seems somewhat wrong to determine this at execution
> > > time. Shouldn't this rather be done when creating the foreign scan
> node?
> > > And be a part of the scan state?
> >
> > I agree, that was my original plan, but I wasn't familiar enough with the
> > FDW architecture to know where the table struct and the scan struct were
> > both exposed in the same function.
> >
> > What I submitted incorporated some of Kyotaro's feedback (and working
> > patch) to my original incomplete patch.
>
> Sorry, it certainly shouldn't be a good place to do such thing. I
> easily selected the place in order to avoid adding new similar
> member in multiple existing structs (PgFdwRelationInfo and
> PgFdwScanState).
>
> Having a new member fetch_size is added in PgFdwRelationInfo and
> PgFdwScanState, I think postgresGetForeignRelSize is the best
> place to do that, from the point that it collects basic
> information needed to calculate scan costs.
>
> # Fetch sizes of foreign join would be the future issue..
>
> > typedef struct PgFdwRelationInfo
> > {
>   ...
> + int    fetch_size;   /* fetch size for this remote table */
>
> ====================
> > postgreGetForeignRelSize()
> > {
>   ...
> >     fpinfo->table = GetForeignTable(foreigntableid);
> >     fpinfo->server = GetForeignServer(fpinfo->table->serverid);
> >
> +     def = get_option(table->options, "fetch_size");
> +     ..
> +     fpinfo->fetch_size = strtod(defGetString...
>
> Also it is doable in postgresGetForeignPlan and doing there
> removes redundant copy of fetch_size from PgFdwRelation to
> PgFdwScanState but theoretical basis would be weak.
>
> regards,
>
> > > On 2015-02-27 13:50:22 -0500, Corey Huinker wrote:
> > > > +static DefElem*
> > > > +get_option(List *options, char *optname)
> > > > +{
> > > > +     ListCell *lc;
> > > > +
> > > > +     foreach(lc, options)
> > > > +     {
> > > > +             DefElem *def = (DefElem *) lfirst(lc);
> > > > +
> > > > +             if (strcmp(def->defname, optname) == 0)
> > > > +                     return def;
> > > > +     }
> > > > +     return NULL;
> > > > +}
> > >
> > >
> > > >       /*
> > > >        * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state
> stays
> > > NULL.
> > > > @@ -915,6 +933,23 @@ postgresBeginForeignScan(ForeignScanState *node,
> > > int eflags)
> > > >       server = GetForeignServer(table->serverid);
> > > >       user = GetUserMapping(userid, server->serverid);
> > > >
> > > > +     /* Reading table options */
> > > > +     fsstate->fetch_size = -1;
> > > > +
> > > > +     def = get_option(table->options, "fetch_size");
> > > > +     if (!def)
> > > > +             def = get_option(server->options, "fetch_size");
> > > > +
> > > > +     if (def)
> > > > +     {
> > > > +             fsstate->fetch_size = strtod(defGetString(def), NULL);
> > > > +             if (fsstate->fetch_size < 0)
> > > > +                     elog(ERROR, "invalid fetch size for foreign
> table
> > > \"%s\"",
> > > > +                              get_rel_name(table->relid));
> > > > +     }
> > > > +     else
> > > > +             fsstate->fetch_size = 100;
> > >
> > > I don't think it's a good idea to make such checks at runtime - and
> > > either way it's somethign that should be reported back using an
> > > ereport(), not an elog.
> >
> > > Also, it seems somewhat wrong to determine this at execution
> > > time. Shouldn't this rather be done when creating the foreign scan
> node?
> > > And be a part of the scan state?
> >
> > I agree, that was my original plan, but I wasn't familiar enough with the
> > FDW architecture to know where the table struct and the scan struct were
> > both exposed in the same function.
> >
> > What I submitted incorporated some of Kyotaro's feedback (and working
> > patch) to my original incomplete patch.
> >
> > > Have you thought about how this option should cooperate with join
> > > pushdown once implemented?
> > >
> >
> > I hadn't until now, but I think the only sensible thing would be to
> > disregard table-specific settings once a second foreign table is
> detected,
> > and instead consider only the server-level setting.
> >
> > I suppose one could argue that if ALL the tables in the join had the same
> > table-level setting, we should go with that, but I think that would be
> > complicated, expensive, and generally a good argument for changing the
> > server setting instead.
>
> --
> Kyotaro Horiguchi
> NTT Open Source Software Center
>
>
Ok, with some guidance from RhodiumToad (thanks!) I was able to get the
proper RelOptInfo->Plan->Scan handoff.

What I *don't* know how to do is show that the proper fetch sizes are being
used on the remote server with the resources available in the regression
test. *Suggestions welcome.*

This patch works for my original added test-cases, and works for me
connecting to a redshift cluster that we have, the queries show up in the
console like this:
     FETCH 101 FROM c1
     FETCH 30 FROM c1
     FETCH 50 FROM c1

The (redacted) source of that test is as follows:


begin;
create extension if not exists postgres_fdw;

create server redshift foreign data wrapper postgres_fdw
options (host 'REDACTED', port '5439', dbname 'REDACTED', fetch_size '101');

select * from pg_foreign_server;

create user mapping for public server redshift
options ( user 'REDACTED', password 'REDACTED');

select * from pg_user_mappings;

create foreign table test_table ( date date, tval text )
server redshift
options (table_name 'REDACTED');

select count(*) from ( select tval from test_table where date = 'REDACTED'
) x;

alter server redshift options ( set fetch_size '30' );

select count(*) from ( select tval from test_table where date = 'REDACTED'
) x;

alter foreign table test_table options ( fetch_size '50' );

select count(*) from ( select tval from test_table where date = 'REDACTED'
) x;

rollback;



Attached is the patch / diff against current master.
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 1f417b3..51edc24 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -106,7 +106,8 @@ ALTER SERVER testserver1 OPTIONS (
 	sslcert 'value',
 	sslkey 'value',
 	sslrootcert 'value',
-	sslcrl 'value'
+	sslcrl 'value',
+	fetch_size '101'
 	--requirepeer 'value',
 	-- krbsrvname 'value',
 	-- gsslib 'value',
@@ -114,18 +115,34 @@ ALTER SERVER testserver1 OPTIONS (
 );
 ALTER USER MAPPING FOR public SERVER testserver1
 	OPTIONS (DROP user, DROP password);
-ALTER FOREIGN TABLE ft1 OPTIONS (schema_name 'S 1', table_name 'T 1');
+ALTER FOREIGN TABLE ft1 OPTIONS (schema_name 'S 1', table_name 'T 1', fetch_size '102');
 ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1');
 ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 \det+
-                             List of foreign tables
- Schema | Table |  Server  |              FDW Options              | Description 
---------+-------+----------+---------------------------------------+-------------
- public | ft1   | loopback | (schema_name 'S 1', table_name 'T 1') | 
- public | ft2   | loopback | (schema_name 'S 1', table_name 'T 1') | 
+                                      List of foreign tables
+ Schema | Table |  Server  |                       FDW Options                       | Description 
+--------+-------+----------+---------------------------------------------------------+-------------
+ public | ft1   | loopback | (schema_name 'S 1', table_name 'T 1', fetch_size '102') | 
+ public | ft2   | loopback | (schema_name 'S 1', table_name 'T 1')                   | 
 (2 rows)
 
+-- Test what options made it into pg_foreign_server.
+-- Filter for just the server we created.
+SELECT srvoptions FROM pg_foreign_server WHERE srvname = 'testserver1';
+                                                                                                                                                                                     srvoptions                                                                                                                                                                                     
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ {use_remote_estimate=false,updatable=true,fdw_startup_cost=123.456,fdw_tuple_cost=0.123,service=value,connect_timeout=value,dbname=value,host=value,hostaddr=value,port=value,application_name=value,keepalives=value,keepalives_idle=value,keepalives_interval=value,sslcompression=value,sslmode=value,sslcert=value,sslkey=value,sslrootcert=value,sslcrl=value,fetch_size=101}
+(1 row)
+
+-- Test what options made it into pg_foreign_table.
+-- Filter this heavily because we cannot specify which foreign server.
+SELECT ftoptions FROM pg_foreign_table WHERE ftoptions @> array['table_name=T 1','fetch_size=102'];
+                      ftoptions                      
+-----------------------------------------------------
+ {"schema_name=S 1","table_name=T 1",fetch_size=102}
+(1 row)
+
 -- Now we should be able to run ANALYZE.
 -- To exercise multiple code paths, we use local stats on ft1
 -- and remote-estimate mode on ft2.
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 7547ec2..2a3ab7d 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -153,6 +153,12 @@ InitPgFdwOptions(void)
 		/* updatable is available on both server and table */
 		{"updatable", ForeignServerRelationId, false},
 		{"updatable", ForeignTableRelationId, false},
+		/*
+		 * fetch_size is available on both server and table, the table setting
+		 * overrides the server setting.
+		 */
+		{"fetch_size", ForeignServerRelationId, false},
+		{"fetch_size", ForeignTableRelationId, false},
 		{NULL, InvalidOid, false}
 	};
 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index e4d799c..11abd23 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -79,6 +79,8 @@ typedef struct PgFdwRelationInfo
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;			/* only set in use_remote_estimate mode */
+
+	int			fetch_size      /* fetch size for this remote table */
 } PgFdwRelationInfo;
 
 /*
@@ -99,7 +101,9 @@ enum FdwScanPrivateIndex
 	/* SQL statement to execute remotely (as a String node) */
 	FdwScanPrivateSelectSql,
 	/* Integer list of attribute numbers retrieved by the SELECT */
-	FdwScanPrivateRetrievedAttrs
+	FdwScanPrivateRetrievedAttrs,
+	/* Integer representing the desired fetch_size */
+	FdwScanPrivateFetchSize
 };
 
 /*
@@ -135,6 +139,7 @@ typedef struct PgFdwScanState
 	/* extracted fdw_private data */
 	char	   *query;			/* text of SELECT command */
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
+	int			fetch_size;		/* number of tuples per fetch */
 
 	/* for remote query execution */
 	PGconn	   *conn;			/* connection for the scan */
@@ -329,6 +334,8 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   List *retrieved_attrs,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
+static int get_fetch_size(ForeignTable	*table,
+					ForeignServer *server);
 
 
 /*
@@ -397,6 +404,8 @@ postgresGetForeignRelSize(PlannerInfo *root,
 	/* Look up foreign-table catalog info. */
 	fpinfo->table = GetForeignTable(foreigntableid);
 	fpinfo->server = GetForeignServer(fpinfo->table->serverid);
+	/* Look up any table-specific fetch size */
+	fpinfo->fetch_size = get_fetch_size(fpinfo->table,fpinfo->server);
 
 	/*
 	 * Extract user-settable option values.  Note that per-table setting of
@@ -858,8 +867,9 @@ postgresGetForeignPlan(PlannerInfo *root,
 	 * Build the fdw_private list that will be available to the executor.
 	 * Items in the list must match enum FdwScanPrivateIndex, above.
 	 */
-	fdw_private = list_make2(makeString(sql.data),
-							 retrieved_attrs);
+	fdw_private = list_make3(makeString(sql.data),
+							 retrieved_attrs,
+							 makeInteger(fpinfo->fetch_size));
 
 	/*
 	 * Create the ForeignScan node from target list, local filtering
@@ -877,6 +887,22 @@ postgresGetForeignPlan(PlannerInfo *root,
 							NIL /* no custom tlist */ );
 }
 
+static DefElem*
+get_option(List *options, char *optname)
+{
+	ListCell *lc;
+
+	foreach(lc, options)
+	{
+		DefElem	*def = (DefElem *) lfirst(lc);
+
+		if (strcmp(def->defname, optname) == 0)
+			return def;
+	}
+	return NULL;
+}
+
+
 /*
  * postgresBeginForeignScan
  *		Initiate an executor scan of a foreign PostgreSQL table.
@@ -895,6 +921,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	int			numParams;
 	int			i;
 	ListCell   *lc;
+	DefElem	   *def;
 
 	/*
 	 * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
@@ -921,6 +948,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(userid, server->serverid);
 
+
+
 	/*
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
@@ -936,6 +965,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 									 FdwScanPrivateSelectSql));
 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
 											   FdwScanPrivateRetrievedAttrs);
+	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
+								 FdwScanPrivateFetchSize));
 
 	/* Create contexts for batches of tuples and per-tuple temp workspace. */
 	fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
@@ -2045,15 +2076,11 @@ fetch_more_data(ForeignScanState *node)
 	{
 		PGconn	   *conn = fsstate->conn;
 		char		sql[64];
-		int			fetch_size;
 		int			numrows;
 		int			i;
 
-		/* The fetch size is arbitrary, but shouldn't be enormous. */
-		fetch_size = 100;
-
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-				 fetch_size, fsstate->cursor_number);
+				 fsstate->fetch_size, fsstate->cursor_number);
 
 		res = PQexec(conn, sql);
 		/* On error, report the original query, not the FETCH. */
@@ -2081,7 +2108,7 @@ fetch_more_data(ForeignScanState *node)
 			fsstate->fetch_ct_2++;
 
 		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fetch_size);
+		fsstate->eof_reached = (numrows < fsstate->fetch_size);
 
 		PQclear(res);
 		res = NULL;
@@ -2465,8 +2492,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			 * then just adjust rowstoskip and samplerows appropriately.
 			 */
 
-			/* The fetch size is arbitrary, but shouldn't be enormous. */
-			fetch_size = 100;
+			fetch_size = get_fetch_size(table,server);
 
 			/* Fetch some rows */
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
@@ -2621,7 +2647,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
-					 errmsg("invalid option \"%s\"", def->defname)));
+						errmsg("invalid option \"%s\"", def->defname)));
 	}
 
 	/*
@@ -2994,3 +3020,40 @@ conversion_error_callback(void *arg)
 				   NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
 				   RelationGetRelationName(errpos->rel));
 }
+
+/*
+ * Scan the foreign sever and foreign table definitions for any explicit
+ * fetch_size options. Prefer table-specific option to server-wide option.
+ * If none are found, keep the previous default of 100
+ */
+static int
+get_fetch_size(ForeignTable	*table,
+			   ForeignServer *server)
+{
+	DefElem    *def;
+	int			fetch_size;
+
+	def = get_option(table->options, "fetch_size");
+	if (!def)
+	{
+		/*
+		 * In the absence of table-specific fetch size,
+		 * look for a server-specific one
+		 */
+		def = get_option(server->options, "fetch_size");
+	}
+
+	if (def)
+	{
+		fetch_size = strtol(defGetString(def), NULL,10);
+		if (fetch_size < 0)
+			ereport(ERROR,
+                    (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+                        errmsg("invalid fetch size for foreign table \"%s\"",
+                                get_rel_name(table->relid)),
+                        errhint("fetch_size must be > 0.")));
+	}
+	else
+		fetch_size = 100;
+	return fetch_size;
+}
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index fcdd92e..8521fab 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -115,7 +115,8 @@ ALTER SERVER testserver1 OPTIONS (
 	sslcert 'value',
 	sslkey 'value',
 	sslrootcert 'value',
-	sslcrl 'value'
+	sslcrl 'value',
+	fetch_size '101'
 	--requirepeer 'value',
 	-- krbsrvname 'value',
 	-- gsslib 'value',
@@ -123,12 +124,20 @@ ALTER SERVER testserver1 OPTIONS (
 );
 ALTER USER MAPPING FOR public SERVER testserver1
 	OPTIONS (DROP user, DROP password);
-ALTER FOREIGN TABLE ft1 OPTIONS (schema_name 'S 1', table_name 'T 1');
+ALTER FOREIGN TABLE ft1 OPTIONS (schema_name 'S 1', table_name 'T 1', fetch_size '102');
 ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1');
 ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 \det+
 
+-- Test what options made it into pg_foreign_server.
+-- Filter for just the server we created.
+SELECT srvoptions FROM pg_foreign_server WHERE srvname = 'testserver1';
+
+-- Test what options made it into pg_foreign_table.
+-- Filter this heavily because we cannot specify which foreign server.
+SELECT ftoptions FROM pg_foreign_table WHERE ftoptions @> array['table_name=T 1','fetch_size=102'];
+
 -- Now we should be able to run ANALYZE.
 -- To exercise multiple code paths, we use local stats on ft1
 -- and remote-estimate mode on ft2.
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index 7c92282..a96f9a2 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -302,6 +302,34 @@
   </sect3>
 
   <sect3>
+   <title>Fetch Size Options</title>
+
+   <para>
+    By default, rows are fetched from the remote server 100 at a time.
+    This may be overridden using the following option:
+   </para>
+
+   <variablelist>
+
+    <varlistentry>
+     <term><literal>fetch_size</literal></term>
+     <listitem>
+      <para>
+       This option specifies the number of rows <filename>postgres_fdw</> 
+       should get in each fetch operation. It can be specified for a foreign
+       table or a foreign server. The option specified on a table overrides
+       an option specified for the server.
+       The default is <literal>100</>.
+      </para>
+
+     </listitem>
+    </varlistentry>
+
+   </variablelist>
+  </sect3>
+
+
+  <sect3>
    <title>Importing Options</title>
 
    <para>
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to