I needed something to test the FDW API patch with, and didn't want to get involved in the COPY API changes, and also wanted to have something that needs real connection management and can push down quals. So I updated the postgresql_fdw patch to work with the latest FDW patch.

Here. It's a bit of a mess, but it works for simple queries..

It requires a small change to the FDW api (fdw-api-add-serverid-userid.patch). I added server oid and user oid fields to the FdwPlan - that seems like basic information that most FDW's will need, so it seems awkward to require the FDW to wrap them in Const nodes and a List.

These are also available in my git repository at git://git.postgresql.org/git/users/heikki/postgres.git, branches fdw2 and postgresql_fdw.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/contrib/postgresql_fdw/postgresql_fdw.c b/contrib/postgresql_fdw/postgresql_fdw.c
index cd7902d..17e3856 100644
--- a/contrib/postgresql_fdw/postgresql_fdw.c
+++ b/contrib/postgresql_fdw/postgresql_fdw.c
@@ -3,17 +3,19 @@
  * postgresql_fdw.c
  *		  foreign-data wrapper for PostgreSQL
  *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
- *		  $PostgreSQL$
+ *		  contrib/postgresql_fdw/postgresql_fdw.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include "catalog/pg_namespace.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
+#include "foreign/foreign.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
 #include "mb/pg_wchar.h"
@@ -40,25 +42,21 @@ extern Datum postgresql_fdw_handler(PG_FUNCTION_ARGS);
 /*
  * FDW routines
  */
-static FSConnection* pgConnectServer(ForeignServer *server, UserMapping *user);
-static void pgFreeFSConnection(FSConnection *conn);
-static void pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel);
-static void pgOpen(ForeignScanState *scanstate);
-static void pgBeginScan(ForeignScanState *scanstate);
-static void pgIterate(ForeignScanState *scanstate);
-static void pgClose(ForeignScanState *scanstate);
-static void pgReOpen(ForeignScanState *scanstate);
+static FdwPlan *pgPlanRelScan(Oid foreigntableid, PlannerInfo *root,
+			  RelOptInfo *baserel);
+static FdwExecutionState *pgBeginScan(FdwPlan *plan, ParamListInfo params);
+static void pgIterate(FdwExecutionState *state, TupleTableSlot *slot);
+static void pgReScan(FdwExecutionState *state);
+static void pgEndScan(FdwExecutionState *state);
 
 /* helper for deparsing a request into SQL statement */
-static bool is_foreign_qual(Expr *expr);
+
+static bool is_foreign_qual(PlannerInfo *root, RelOptInfo *baserel, Expr *expr);
 static bool foreign_qual_walker(Node *node, void *context);
 static void deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, const char *aliasname, bool prefix);
-static void deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix);
-static char *deparseSql(ForeignScanState *scanstate);
+static char *deparseSql(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel);
 
-/* helper for handling result tuples */
-static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd,
-						TupleDesc tupdesc, PGresult *res);
+static void storeResult(TupleTableSlot *slot, PGresult *res, int rowno);
 
 /*
  * Connection management
@@ -75,25 +73,23 @@ static void cleanup_connection(ResourceReleasePhase phase,
 /*
  * PostgreSQL specific portion of a foreign query request
  */
-typedef struct pgFdwReply
+typedef struct pgFdwExecutionState
 {
-	char			   *sql;		/* SQL text to be sent to foreign server */
-	Tuplestorestate	   *tupstore;	/* store all of result tuples */
-} pgFdwReply;
+	PGconn *conn;
+	PGresult *res;
+	int		nextrow;
+} pgFdwExecutionState;
 
 /*
  * FdwRoutine of PostgreSQL wrapper
  */
 static FdwRoutine postgresql_fdw_routine =
 {
-	pgConnectServer,
-	pgFreeFSConnection,
-	pgEstimateCosts,
-	pgOpen,
+	pgPlanRelScan,
 	pgBeginScan,
 	pgIterate,
-	pgClose,
-	pgReOpen,
+	pgReScan,
+	pgEndScan
 };
 
 /*
@@ -107,58 +103,18 @@ postgresql_fdw_handler(PG_FUNCTION_ARGS)
 	PG_RETURN_POINTER(&postgresql_fdw_routine);
 }
 
-/*
- * Connect to a foreign PostgreSQL server with libpq if necessary.
- */
-static FSConnection *
-pgConnectServer(ForeignServer *server, UserMapping *user)
+static FdwPlan *
+pgPlanRelScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel)
 {
-	elog(DEBUG3, "%s() called for \"%s\"", __FUNCTION__, server->servername);
-
-	return (FSConnection *) GetConnection(server, user);
-}
+	FdwPlan *plan = makeNode(FdwPlan);
+	char *sql;
 
+	sql = deparseSql(foreigntableid, root, baserel);
 
-/*
- * Disconnect from the foreign server if the connection is not referenced by
- * any other scan.
- */
-static void
-pgFreeFSConnection(FSConnection *conn)
-{
-	elog(DEBUG3, "%s() called", __FUNCTION__);
-
-	if (conn == NULL)
-		return;
-
-	ReleaseConnection((PGconn *)conn);
-}
+	plan->explainInfo = sql;
+	plan->fdw_private = (void *) sql;
 
-/*
- * Check whether the ExprState node can be evaluated in foreign server.
- *
- * An expression which consists of expressions below can be evaluated in
- * the foreign server.
- *  - constant value
- *  - variable (foreign table column)
- *  - external parameter (parameter of prepared statement)
- *  - array
- *  - bool expression (AND/OR/NOT)
- *  - NULL test (IS [NOT] NULL)
- *  - operator
- *    - IMMUTABLE only
- *    - It is required that the meaning of the operator be the same as the
- *      local server in the foreign server. 
- *  - function
- *    - IMMUTABLE only
- *    - It is required that the meaning of the operator be the same as the
- *      local server in the foreign server. 
- *  - scalar array operator (ANY/ALL)
- */
-static bool
-is_foreign_qual(Expr *expr)
-{
-	return !foreign_qual_walker((Node *) expr, NULL);
+	return plan;
 }
 
 /*
@@ -237,8 +193,10 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
 	first = true;
 	for (i = 0; i < tupdesc->natts; i++)
 	{
+#ifdef NOT_USED
 		List	   *options;
 		ListCell   *lc;
+#endif
 		char	   *colname = NULL;
 
 		/* skip dropped attributes */
@@ -246,6 +204,7 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
 			continue;
 
 		/* Determine column name to be used */
+#ifdef NOT_USED /* XXX: What was this all about? */
 		options = GetGenericOptionsPerColumn(table->relid, i + 1);
 		foreach (lc, options)
 		{
@@ -256,6 +215,7 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
 				break;
 			}
 		}
+#endif
 		if (!colname)
 			colname = tupdesc->attrs[i]->attname.data;
 
@@ -278,51 +238,141 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
 		pfree((char *) aliasname_q);
 }
 
-/*
- * Deparse the passed information into FROM clauses and append to the buffer
- * 'sql'.
- */
 static void
-deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix)
+deparse_var(PlannerInfo *root, StringInfo buf, Var *var)
 {
-	char		   *nspname = NULL;
-	char		   *relname = NULL;
-	const char	   *nspname_q;
-	const char	   *relname_q;
-	const char	   *aliasname_q;
-	ListCell	   *lc;
+	RangeTblEntry *rte;
+	char *attname;
 
-	/* The alias of relation is used in both SELECT clause and FROM clause. */
-	aliasname_q = quote_identifier(aliasname);
+	if (var->varlevelsup != 0)
+		elog(ERROR, "unexpected varlevelsup %d in remote query",
+			 var->varlevelsup);
 
-	/*
-	 * If the foreign table has generic option "nspname" and/or "relname", use
-	 * them in the foreign query.  Otherwise, use local catalog names.
-	 * Each identifier should be quoted because they might be case sensitive.
-	 */
-	foreach(lc, table->options)
+	if (var->varno < 1 || var->varno > list_length(root->parse->rtable))
+		elog(ERROR, "unexpected varno %d in remote query", var->varno);
+	rte = rt_fetch(var->varno, root->parse->rtable);
+
+	attname  = get_rte_attribute_name(rte, var->varattno);
+	appendStringInfoString(buf, quote_identifier(attname));
+}
+
+typedef struct
+{
+	PlannerInfo *root;
+	RelOptInfo *foreignrel;
+} remotely_executable_cxt;
+
+static bool
+is_proc_remotely_executable(Oid procid)
+{
+	/* assume that only built-in functions can be pushed down */
+	if (get_func_namespace(procid) != PG_CATALOG_NAMESPACE)
+		return false;
+	/* we don't check volatility here, that's done once at the top-level */
+
+	return true;
+}
+
+static bool
+is_not_remotely_executable_walker(Node *node, remotely_executable_cxt *context)
+{
+	if (node == NULL)
+		return false;
+
+	if (IsA(node, Query))
+		return true;
+
+	switch (nodeTag(node))
 	{
-		DefElem *opt = lfirst(lc);
-		if (strcmp(opt->defname, "nspname") == 0)
-			nspname = pstrdup(strVal(opt->arg));
-		else if (strcmp(opt->defname, "relname") == 0)
-			relname = pstrdup(strVal(opt->arg));
+		case T_Query:
+			/* give up on subqueries */
+			return true;
+		case T_Param:
+			/* TODO: pass internal parameters to the foreign server */
+			{
+				ParamKind	paramkind = ((Param *) node)->paramkind;
+				elog(DEBUG1, "%s() param=%s", __FUNCTION__,
+	  				paramkind == PARAM_EXTERN ? "PARAM_EXTERN" :
+	  				paramkind == PARAM_EXEC ? "PARAM_EXEC" :
+	  				paramkind == PARAM_SUBLINK ? "PARAM_SUBLINK" : "unkown");
+			}
+			if (((Param *) node)->paramkind != PARAM_EXTERN)
+				return true;
+			break;
+		case T_OpExpr:
+			{
+				OpExpr *op = (OpExpr *) node;
+				if (!is_proc_remotely_executable(op->opfuncid))
+					return true;
+				else
+					return false;
+			}
+		case T_FuncExpr:
+			{
+				FuncExpr *fe = (FuncExpr *) node;
+				if (!is_proc_remotely_executable(fe->funcid))
+					return true;
+				else
+					return false;
+			}
+
+		case T_TargetEntry:
+		case T_PlaceHolderVar:
+		case T_AppendRelInfo:
+		case T_PlaceHolderInfo:
+			/* TODO: research whether those complex nodes are evaluatable. */
+			return true;
+		case T_Var:
+			{
+				Var *var = (Var *) node;
+				if (var->varno != context->foreignrel->relid || var->varlevelsup != 0)
+					return true;
+				else
+					return false;
+			}
+
+		default:
+			break;
 	}
-	if (nspname == NULL)
-		nspname = get_namespace_name(get_rel_namespace(table->relid));
-	if (relname == NULL)
-		relname = get_rel_name(table->relid);
-	nspname_q = quote_identifier(nspname);
-	relname_q = quote_identifier(relname);
-	appendStringInfo(sql, " FROM %s.%s %s", nspname_q, relname_q, aliasname_q);
-	pfree(nspname);
-	pfree(relname);
-	if (nspname_q != nspname)
-		pfree((char *) nspname_q);
-	if (relname_q != relname)
-		pfree((char * ) relname_q);
-	if (aliasname_q != aliasname)
-		pfree((char *) aliasname_q);
+
+	return expression_tree_walker(node, foreign_qual_walker, context);
+}
+
+
+/*
+ * Check whether the ExprState node can be evaluated in foreign server.
+ *
+ * An expression which consists of expressions below can be evaluated in
+ * the foreign server.
+ *  - constant value
+ *  - variable (foreign table column)
+ *  - external parameter (parameter of prepared statement)
+ *  - array
+ *  - bool expression (AND/OR/NOT)
+ *  - NULL test (IS [NOT] NULL)
+ *  - operator
+ *    - IMMUTABLE only
+ *    - It is required that the meaning of the operator be the same as the
+ *      local server in the foreign server. 
+ *  - function
+ *    - IMMUTABLE only
+ *    - It is required that the meaning of the operator be the same as the
+ *      local server in the foreign server. 
+ *  - scalar array operator (ANY/ALL)
+ */
+static bool
+is_foreign_qual(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
+{
+	remotely_executable_cxt context;
+
+	context.root = root;
+	context.foreignrel = baserel;
+	if (is_not_remotely_executable_walker((Node *) expr, &context))
+		return false;
+	if (contain_volatile_functions((Node *) expr))
+		return false;
+
+	return true;
 }
 
 /*
@@ -336,39 +386,58 @@ deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bo
  *     remote side and local side
  */
 static char *
-deparseSql(ForeignScanState *scanstate)
+deparseSql(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel)
 {
-	EState		   *estate = scanstate->ss.ps.state;
-	bool			prefix;
 	List		   *context;
 	StringInfoData	sql;
-	ForeignScan	   *scan;
-	RangeTblEntry  *rte;
-	ForeignTable   *table = scanstate->table;
+	ForeignTable   *table = GetForeignTable(foreigntableid);
+	ListCell   *lc;
+	bool		first;
+	char	   *nspname = NULL;
+	char	   *relname = NULL;
 
 	/* extract ForeignScan and RangeTblEntry */
-	scan = (ForeignScan *)scanstate->ss.ps.plan;
-	rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1);
 
 	/* prepare to deparse plan */
 	initStringInfo(&sql);
-	context = deparse_context_for_planstate((Node *)&scanstate->ss.ps, NULL,
-									   estate->es_range_table);
+
+	context = deparse_context_for("foreigntable", foreigntableid);
+
+	/* deparse SELECT target list */
+	appendStringInfoString(&sql, "SELECT ");
+	first = true;
+	foreach (lc, baserel->reltargetlist)
+	{
+		Var *var = lfirst(lc);
+
+		if (!first)
+			appendStringInfoString(&sql, ", ");
+		first = false;
+		deparse_var(root, &sql, var);
+	}
 
 	/*
-	 * XXX: Prefix is set to false always because setting prefix to true makes
-	 * the SQL invalid when this is a child scan for an inherited table and qual
-	 * is not empty (need to generate WHERE clause).  It might be needed to fix
-	 * deparse_expression() to deparse column references in the qual into
-	 * name of the child table, instead of name of the parent table, or table
-	 * alias.
+	 * Deparse FROM
+	 *
+	 * If the foreign table has generic option "nspname" and/or "relname", use
+	 * them in the foreign query.  Otherwise, use local catalog names.
 	 */
-	prefix = false;
-
-	/* deparse SELECT and FROM clauses */
-	deparseSelectClause(&sql, table, scanstate->ss.ss_currentRelation->rd_att,
-						rte->eref->aliasname, prefix);
-	deparseFromClause(&sql, table, rte->eref->aliasname, prefix);
+	foreach(lc, table->options)
+	{
+		DefElem *opt = lfirst(lc);
+		if (strcmp(opt->defname, "nspname") == 0)
+			nspname = pstrdup(strVal(opt->arg));
+		else if (strcmp(opt->defname, "relname") == 0)
+			relname = pstrdup(strVal(opt->arg));
+	}
+	if (nspname == NULL)
+		nspname = get_namespace_name(get_rel_namespace(foreigntableid));
+	if (relname == NULL)
+		relname = get_rel_name(foreigntableid);
+	appendStringInfo(&sql, " FROM %s.%s",
+					 quote_identifier(nspname), 
+					 quote_identifier(relname));
+	
 
 	/*
 	 * deparse WHERE cluase
@@ -381,10 +450,9 @@ deparseSql(ForeignScanState *scanstate)
 	 * statement to make following EXECUTE statements work properly.  The Plan
 	 * node is used repeatedly to create PlanState for each EXECUTE statement.
 	 */
-	if (scanstate->ss.ps.plan->qual)
+	if (baserel->baserestrictinfo)
 	{
 		List	   *local_qual = NIL;
-		List	   *foreign_qual = NIL;
 		List	   *foreign_expr = NIL;
 		ListCell   *lc;
 
@@ -392,23 +460,24 @@ deparseSql(ForeignScanState *scanstate)
 		 * Divide qual of PlanState into two lists, one for local evaluation
 		 * and one for foreign evaluation.
 		 */
-		foreach (lc, scanstate->ss.ps.qual)
+		foreach (lc, baserel->baserestrictinfo)
 		{
-			ExprState	   *state = lfirst(lc);
+			RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
 
-			if (is_foreign_qual(state->expr))
-			{ foreign_qual = lappend(foreign_qual, state);
-				foreign_expr = lappend(foreign_expr, state->expr);
+			if (is_foreign_qual(root, baserel, ri->clause))
+			{
+				/* XXX: deparse and add to sql here */
+				foreign_expr = lappend(foreign_expr, ri->clause);
 			}
 			else
-				local_qual = lappend(local_qual, state);
+				local_qual = lappend(local_qual, ri);
 		}
 		/*
 		 * XXX: If the remote side is not reliable enough, we can keep the qual
 		 * in PlanState as is and evaluate them on local side too.  If so, just
 		 * omit replacement below.
 		 */
-		scanstate->ss.ps.qual = local_qual;
+		baserel->baserestrictinfo = local_qual;
 
 		/*
 		 * Deparse quals to be evaluated in the foreign server if any.
@@ -420,7 +489,7 @@ deparseSql(ForeignScanState *scanstate)
 			Node   *node;
 			node = (Node *) make_ands_explicit(foreign_expr);
 			appendStringInfo(&sql, " WHERE %s",
-				deparse_expression(node, context, prefix, false));
+				deparse_expression(node, context, false, false));
 			/*
 			 * The contents of the list MUST NOT be free-ed because they are
 			 * referenced from Plan.qual list.
@@ -435,39 +504,29 @@ deparseSql(ForeignScanState *scanstate)
 }
 
 /*
- * Deparse the request into SQL statement and keep it for future execution.
- *
- * XXX: deparsing should be done in pgEstimateCosts to estimate the costs by
- * executing EXPLAIN on remote side?
- */
-static void
-pgOpen(ForeignScanState *scanstate)
-{
-	pgFdwReply	   *reply;
-
-	elog(DEBUG3, "%s() called ", __FUNCTION__);
-
-	/* FWD-specific portion */
-	reply = (pgFdwReply *) palloc0(sizeof(*reply));
-	reply->sql = deparseSql(scanstate);
-	scanstate->reply = (FdwReply *) reply;
-}
-
-/*
  * Initiate actual scan on a foreign table.
  * This function is called just after pgOpen() if the ForeignScan was executed
  * for a real query or EXPLAIN statement with ANALYZE option.
  */
-static void
-pgBeginScan(ForeignScanState *scanstate)
+static FdwExecutionState *
+pgBeginScan(FdwPlan *plan, ParamListInfo params)
 {
-	pgFdwReply	   *reply = (pgFdwReply *) scanstate->reply;
-	PGconn		   *conn = (PGconn *) scanstate->conn;
-	PGresult	   *res;
-	ParamListInfo	info = scanstate->ss.ps.state->es_param_list_info;
-	int				numParams = info ? info->numParams : 0;
-	Oid			   *types = NULL;
-	const char	  **values = NULL;
+	FdwExecutionState *result = palloc(sizeof(FdwExecutionState));
+	pgFdwExecutionState *state;
+	PGconn	   *conn;
+	PGresult   *res;
+	int			numParams = params ? params->numParams : 0;
+	Oid		   *types = NULL;
+	const char **values = NULL;
+	const char *sql = (const char *) plan->fdw_private;
+
+	state = palloc(sizeof(pgFdwExecutionState));
+
+	conn = GetConnection(GetForeignServer(plan->serverid),
+						 GetUserMapping(plan->userid, plan->serverid));
+	state->conn = conn;	
+
+	result->fdw_private = state;
 
 	elog(DEBUG3, "%s() called", __FUNCTION__);
 
@@ -481,8 +540,8 @@ pgBeginScan(ForeignScanState *scanstate)
 		values = palloc0(sizeof(char *) * numParams);
 		for (i = 0; i < numParams; i++)
 		{
-			types[i] = info->params[i].ptype;
-			if (info->params[i].isnull)
+			types[i] = params->params[i].ptype;
+			if (params->params[i].isnull)
 				values[i] = NULL;
 			else
 			{
@@ -495,7 +554,7 @@ pgBeginScan(ForeignScanState *scanstate)
 				getTypeOutputInfo(types[i], &out_func_oid, &isvarlena);
 				fmgr_info(out_func_oid, &func);
 				values[i] =
-					OutputFunctionCall(&func, info->params[i].value);
+					OutputFunctionCall(&func, params->params[i].value);
 			}
 		}
 	}
@@ -505,8 +564,7 @@ pgBeginScan(ForeignScanState *scanstate)
 	 * TODO: support internal parameters(PARAM_EXTERN)
 	 * TODO: support cursor mode for huge result sets.
 	 */
-	res = PQexecParams(conn, reply->sql,
-							numParams, types, values, NULL, NULL, 0);
+	res = PQexecParams(conn, sql, numParams, types, values, NULL, NULL, 0);
 	if (numParams > 0)
 	{
 		int i;
@@ -525,31 +583,18 @@ pgBeginScan(ForeignScanState *scanstate)
 	{
 		char *msg;
 
-		PQclear(res);
 		msg = pstrdup(PQerrorMessage(conn));
-		ereport(ERROR, (
-				errmsg("could not execute foreign query"),
-				errdetail("%s", msg), errhint("%s", reply->sql)));
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("could not execute foreign query"),
+				 errdetail("%s", msg),
+				 errhint("%s", sql)));
 	}
 
-	/* Note: use PG_TRY to ensure freeing PGresult. */
-	PG_TRY();
-	{
-		TupleDesc	tupdesc = ExecGetScanType((ScanState *) scanstate);
-
-		/* create tuplestore to store results */
-		reply->tupstore = tuplestore_begin_heap(true, false, work_mem);
+	state->res = res;
+	state->nextrow = 0;
 
-		storeResult(reply->tupstore, false, tupdesc, res);
-
-		PQclear(res);
-	}
-	PG_CATCH();
-	{
-		PQclear(res);
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
+	return result;
 }
 
 /*
@@ -562,87 +607,48 @@ pgBeginScan(ForeignScanState *scanstate)
  * ScanState.
  */
 static void
-pgIterate(ForeignScanState *scanstate)
+pgIterate(FdwExecutionState *state, TupleTableSlot *slot)
 {
-	pgFdwReply	   *reply = (pgFdwReply *) scanstate->reply;
-	TupleTableSlot *slot = scanstate->ss.ss_ScanTupleSlot;
+	pgFdwExecutionState *priv = state->fdw_private;
 
 	elog(DEBUG3, "%s() called", __FUNCTION__);
 
-	/* store the next tuple into the slot from the tuplestore */
-	if (tuplestore_gettupleslot(reply->tupstore, true, false, slot))
-	{
-		/*
-		 * Because the tuples stored in the tupstore are minimal tuples,
-		 * they have to be materialized to retrieve system attributes.
-		 */
-		ExecMaterializeSlot(slot);
-	}
+	if (priv->nextrow == PQntuples(priv->res))
+		ExecClearTuple(slot);
 	else
-	{
-		/* TODO: if cursor mode, reset tuple slot and fetch the next batch. */
-	}
+		storeResult(slot, priv->res, priv->nextrow++);
 }
 
-/*
- * Finish scanning foreign table and dispose objects used for this scan.
- */
 static void
-pgClose(ForeignScanState *scanstate)
+pgReScan(FdwExecutionState *state)
 {
-	pgFdwReply	   *reply = (pgFdwReply *) scanstate->reply;
-
-	elog(DEBUG3, "%s() called", __FUNCTION__);
-
-	if (reply == NULL)
-		return;
-
-	if (reply->tupstore != NULL)
-		tuplestore_end(reply->tupstore);
-
-	/*
-	 * reply->conn is not freed here because foreign connections are
-	 * released by executor via FreeFSConnection.
-	 */
-	pfree(reply);
-	scanstate->reply = NULL;
+	((pgFdwExecutionState *)state->fdw_private)->nextrow = 0;
 }
 
-/*
- * Execute query with new parameter.
- */
 static void
-pgReOpen(ForeignScanState *scanstate)
+pgEndScan(FdwExecutionState *state)
 {
-	pgFdwReply	   *reply = (pgFdwReply *) scanstate->reply;
+	pgFdwExecutionState *priv = (pgFdwExecutionState *) state->fdw_private;
 
-	elog(DEBUG3, "%s() called", __FUNCTION__);
-
-	/* Rewind tuplestore to retrieve all tuples again */
-	if (reply->tupstore)
-		tuplestore_rescan(reply->tupstore);
+	PQclear(priv->res);
+	pfree(priv);
 }
 
 /*
  * Store a PGresult into tuplestore.
  */
 static void
-storeResult(Tuplestorestate *tupstore,
-			bool is_sql_cmd,
-			TupleDesc tupdesc,
-			PGresult *res)
+storeResult(TupleTableSlot *slot, PGresult *res, int rowno)
 {
 	int					i;
-	int					row;
-	int					ntuples;
 	int					nfields;
 	int					attnum;		/* number of non-dropped columns */
 	char			  **values;
 	AttInMetadata	   *attinmeta;
 	Form_pg_attribute  *attrs;
+	TupleDesc	tupdesc = slot->tts_tupleDescriptor;
 
-	ntuples = PQntuples(res);
-	nfields = is_sql_cmd ? 1 : PQnfields(res);
+	nfields = PQnfields(res);
 	attrs = tupdesc->attrs;
 
 	/* count non-dropped columns */
@@ -662,39 +668,29 @@ storeResult(Tuplestorestate *tupstore,
 
 	/* put all tuples into the tuplestore */
 	attinmeta = TupleDescGetAttInMetadata(tupdesc);
-	for (row = 0; row < ntuples; row++)
 	{
 		int			j;
 		HeapTuple	tuple;
 
-		CHECK_FOR_INTERRUPTS();
-
-		if (!is_sql_cmd)
+		for (i = 0, j = 0; i < tupdesc->natts; i++)
 		{
-			for (i = 0, j = 0; i < tupdesc->natts; i++)
+			/* skip dropped columns. */
+			if (attrs[i]->attisdropped)
 			{
-				/* skip dropped columns. */
-				if (attrs[i]->attisdropped)
-				{
-					values[i] = NULL;
-					continue;
-				}
-
-				if (PQgetisnull(res, row, j))
-					values[i] = NULL;
-				else
-					values[i] = PQgetvalue(res, row, j);
-				j++;
+				values[i] = NULL;
+				continue;
 			}
-		}
-		else
-		{
-			values[0] = PQcmdStatus(res);
+
+			if (PQgetisnull(res, rowno, j))
+				values[i] = NULL;
+			else
+				values[i] = PQgetvalue(res, rowno, j);
+			j++;
 		}
 
 		/* build the tuple and put it into the tuplestore. */
 		tuple = BuildTupleFromCStrings(attinmeta, values);
-		tuplestore_puttuple(tupstore, tuple);
+		ExecStoreTuple(tuple, slot, InvalidBuffer, true);
 	}
 
 	/* clean up and return the tuplestore */
@@ -702,6 +698,7 @@ storeResult(Tuplestorestate *tupstore,
 	pfree(values);
 }
 
+#ifdef NOT_USED
 /*
  * Retrieve cost-factors of the foreign server from catalog.
  */
@@ -740,7 +737,7 @@ get_server_costs(Oid relid, double *connection_cost, double *transfer_cost)
 	pfree(keywords);
 	pfree(values);
 }
-
+#endif
 
 /*
  * Estimate costs of scanning on a foreign table.
@@ -763,7 +760,12 @@ pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel)
 
 	/* Get cost factor from catalog to correct costs. */
 	rte = planner_rt_fetch(baserel->relid, root);
+#ifdef NOT_USED
 	get_server_costs(rte->relid, &connection_cost, &transfer_cost);
+#endif
+	/* XXX arbitrary guesses */
+	connection_cost = 10.0;
+	transfer_cost = 1.0;
 	path->path.startup_cost += connection_cost;
 	path->path.total_cost += connection_cost;
 	path->path.total_cost += transfer_cost *
@@ -904,6 +906,23 @@ check_conn_params(const char **keywords, const char **values)
 		   errdetail("Non-superusers must provide a password in the connection string.")));
 }
 
+static int
+flatten_generic_options(List *defelems, const char **keywords, const char **values)
+{
+	ListCell *lc;
+	int i;
+
+	i = 0;
+	foreach(lc, defelems)
+	{
+		DefElem *d = (DefElem *) lfirst(lc);
+		keywords[i] = d->defname;
+		values[i] = strVal(d->arg);
+		i++;
+	}
+	return i;
+}
+
 static PGconn *
 connect_pg_server(ForeignServer *server, UserMapping *user)
 {
diff --git a/contrib/postgresql_fdw/uninstall_postgresql_fdw.sql b/contrib/postgresql_fdw/uninstall_postgresql_fdw.sql
index af39f54..a24bbd4 100644
--- a/contrib/postgresql_fdw/uninstall_postgresql_fdw.sql
+++ b/contrib/postgresql_fdw/uninstall_postgresql_fdw.sql
@@ -1,4 +1,4 @@
-/* contrib/postgresql_fdw/uninstall_postgresql_fdw.sql.in */
+/* contrib/postgresql_fdw/uninstall_postgresql_fdw.sql */
 
 -- Adjust this setting to control where the objects get dropped.
 set search_path = public;
commit c1db2d6546b780f7b3293ac97f176a5114b356d5
Author: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date:   Mon Feb 7 21:51:10 2011 +0200

    Make postgresql_fdw work again with the new API. A lot of things are surely
    still broken, but at least simple queries work now.

diff --git a/contrib/postgresql_fdw/postgresql_fdw.c b/contrib/postgresql_fdw/postgresql_fdw.c
index db34ed3..17e3856 100644
--- a/contrib/postgresql_fdw/postgresql_fdw.c
+++ b/contrib/postgresql_fdw/postgresql_fdw.c
@@ -12,8 +12,10 @@
  */
 #include "postgres.h"
 
+#include "catalog/pg_namespace.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
+#include "foreign/foreign.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
 #include "mb/pg_wchar.h"
@@ -40,25 +42,21 @@ extern Datum postgresql_fdw_handler(PG_FUNCTION_ARGS);
 /*
  * FDW routines
  */
-static FSConnection* pgConnectServer(ForeignServer *server, UserMapping *user);
-static void pgFreeFSConnection(FSConnection *conn);
-static void pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel);
-static void pgOpen(ForeignScanState *scanstate);
-static void pgBeginScan(ForeignScanState *scanstate);
-static void pgIterate(ForeignScanState *scanstate);
-static void pgClose(ForeignScanState *scanstate);
-static void pgReOpen(ForeignScanState *scanstate);
+static FdwPlan *pgPlanRelScan(Oid foreigntableid, PlannerInfo *root,
+			  RelOptInfo *baserel);
+static FdwExecutionState *pgBeginScan(FdwPlan *plan, ParamListInfo params);
+static void pgIterate(FdwExecutionState *state, TupleTableSlot *slot);
+static void pgReScan(FdwExecutionState *state);
+static void pgEndScan(FdwExecutionState *state);
 
 /* helper for deparsing a request into SQL statement */
-static bool is_foreign_qual(Expr *expr);
+
+static bool is_foreign_qual(PlannerInfo *root, RelOptInfo *baserel, Expr *expr);
 static bool foreign_qual_walker(Node *node, void *context);
 static void deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, const char *aliasname, bool prefix);
-static void deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix);
-static char *deparseSql(ForeignScanState *scanstate);
+static char *deparseSql(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel);
 
-/* helper for handling result tuples */
-static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd,
-						TupleDesc tupdesc, PGresult *res);
+static void storeResult(TupleTableSlot *slot, PGresult *res, int rowno);
 
 /*
  * Connection management
@@ -75,25 +73,23 @@ static void cleanup_connection(ResourceReleasePhase phase,
 /*
  * PostgreSQL specific portion of a foreign query request
  */
-typedef struct pgFdwReply
+typedef struct pgFdwExecutionState
 {
-	char			   *sql;		/* SQL text to be sent to foreign server */
-	Tuplestorestate	   *tupstore;	/* store all of result tuples */
-} pgFdwReply;
+	PGconn *conn;
+	PGresult *res;
+	int		nextrow;
+} pgFdwExecutionState;
 
 /*
  * FdwRoutine of PostgreSQL wrapper
  */
 static FdwRoutine postgresql_fdw_routine =
 {
-	pgConnectServer,
-	pgFreeFSConnection,
-	pgEstimateCosts,
-	pgOpen,
+	pgPlanRelScan,
 	pgBeginScan,
 	pgIterate,
-	pgClose,
-	pgReOpen,
+	pgReScan,
+	pgEndScan
 };
 
 /*
@@ -107,58 +103,18 @@ postgresql_fdw_handler(PG_FUNCTION_ARGS)
 	PG_RETURN_POINTER(&postgresql_fdw_routine);
 }
 
-/*
- * Connect to a foreign PostgreSQL server with libpq if necessary.
- */
-static FSConnection *
-pgConnectServer(ForeignServer *server, UserMapping *user)
+static FdwPlan *
+pgPlanRelScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel)
 {
-	elog(DEBUG3, "%s() called for \"%s\"", __FUNCTION__, server->servername);
-
-	return (FSConnection *) GetConnection(server, user);
-}
+	FdwPlan *plan = makeNode(FdwPlan);
+	char *sql;
 
+	sql = deparseSql(foreigntableid, root, baserel);
 
-/*
- * Disconnect from the foreign server if the connection is not referenced by
- * any other scan.
- */
-static void
-pgFreeFSConnection(FSConnection *conn)
-{
-	elog(DEBUG3, "%s() called", __FUNCTION__);
-
-	if (conn == NULL)
-		return;
-
-	ReleaseConnection((PGconn *)conn);
-}
+	plan->explainInfo = sql;
+	plan->fdw_private = (void *) sql;
 
-/*
- * Check whether the ExprState node can be evaluated in foreign server.
- *
- * An expression which consists of expressions below can be evaluated in
- * the foreign server.
- *  - constant value
- *  - variable (foreign table column)
- *  - external parameter (parameter of prepared statement)
- *  - array
- *  - bool expression (AND/OR/NOT)
- *  - NULL test (IS [NOT] NULL)
- *  - operator
- *    - IMMUTABLE only
- *    - It is required that the meaning of the operator be the same as the
- *      local server in the foreign server. 
- *  - function
- *    - IMMUTABLE only
- *    - It is required that the meaning of the operator be the same as the
- *      local server in the foreign server. 
- *  - scalar array operator (ANY/ALL)
- */
-static bool
-is_foreign_qual(Expr *expr)
-{
-	return !foreign_qual_walker((Node *) expr, NULL);
+	return plan;
 }
 
 /*
@@ -237,8 +193,10 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
 	first = true;
 	for (i = 0; i < tupdesc->natts; i++)
 	{
+#ifdef NOT_USED
 		List	   *options;
 		ListCell   *lc;
+#endif
 		char	   *colname = NULL;
 
 		/* skip dropped attributes */
@@ -246,6 +204,7 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
 			continue;
 
 		/* Determine column name to be used */
+#ifdef NOT_USED /* XXX: What was this all about? */
 		options = GetGenericOptionsPerColumn(table->relid, i + 1);
 		foreach (lc, options)
 		{
@@ -256,6 +215,7 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
 				break;
 			}
 		}
+#endif
 		if (!colname)
 			colname = tupdesc->attrs[i]->attname.data;
 
@@ -278,51 +238,141 @@ deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
 		pfree((char *) aliasname_q);
 }
 
-/*
- * Deparse the passed information into FROM clauses and append to the buffer
- * 'sql'.
- */
 static void
-deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix)
+deparse_var(PlannerInfo *root, StringInfo buf, Var *var)
 {
-	char		   *nspname = NULL;
-	char		   *relname = NULL;
-	const char	   *nspname_q;
-	const char	   *relname_q;
-	const char	   *aliasname_q;
-	ListCell	   *lc;
+	RangeTblEntry *rte;
+	char *attname;
 
-	/* The alias of relation is used in both SELECT clause and FROM clause. */
-	aliasname_q = quote_identifier(aliasname);
+	if (var->varlevelsup != 0)
+		elog(ERROR, "unexpected varlevelsup %d in remote query",
+			 var->varlevelsup);
 
-	/*
-	 * If the foreign table has generic option "nspname" and/or "relname", use
-	 * them in the foreign query.  Otherwise, use local catalog names.
-	 * Each identifier should be quoted because they might be case sensitive.
-	 */
-	foreach(lc, table->options)
+	if (var->varno < 1 || var->varno > list_length(root->parse->rtable))
+		elog(ERROR, "unexpected varno %d in remote query", var->varno);
+	rte = rt_fetch(var->varno, root->parse->rtable);
+
+	attname  = get_rte_attribute_name(rte, var->varattno);
+	appendStringInfoString(buf, quote_identifier(attname));
+}
+
+typedef struct
+{
+	PlannerInfo *root;
+	RelOptInfo *foreignrel;
+} remotely_executable_cxt;
+
+static bool
+is_proc_remotely_executable(Oid procid)
+{
+	/* assume that only built-in functions can be pushed down */
+	if (get_func_namespace(procid) != PG_CATALOG_NAMESPACE)
+		return false;
+	/* we don't check volatility here, that's done once at the top-level */
+
+	return true;
+}
+
+static bool
+is_not_remotely_executable_walker(Node *node, remotely_executable_cxt *context)
+{
+	if (node == NULL)
+		return false;
+
+	if (IsA(node, Query))
+		return true;
+
+	switch (nodeTag(node))
 	{
-		DefElem *opt = lfirst(lc);
-		if (strcmp(opt->defname, "nspname") == 0)
-			nspname = pstrdup(strVal(opt->arg));
-		else if (strcmp(opt->defname, "relname") == 0)
-			relname = pstrdup(strVal(opt->arg));
+		case T_Query:
+			/* give up on subqueries */
+			return true;
+		case T_Param:
+			/* TODO: pass internal parameters to the foreign server */
+			{
+				ParamKind	paramkind = ((Param *) node)->paramkind;
+				elog(DEBUG1, "%s() param=%s", __FUNCTION__,
+	  				paramkind == PARAM_EXTERN ? "PARAM_EXTERN" :
+	  				paramkind == PARAM_EXEC ? "PARAM_EXEC" :
+	  				paramkind == PARAM_SUBLINK ? "PARAM_SUBLINK" : "unkown");
+			}
+			if (((Param *) node)->paramkind != PARAM_EXTERN)
+				return true;
+			break;
+		case T_OpExpr:
+			{
+				OpExpr *op = (OpExpr *) node;
+				if (!is_proc_remotely_executable(op->opfuncid))
+					return true;
+				else
+					return false;
+			}
+		case T_FuncExpr:
+			{
+				FuncExpr *fe = (FuncExpr *) node;
+				if (!is_proc_remotely_executable(fe->funcid))
+					return true;
+				else
+					return false;
+			}
+
+		case T_TargetEntry:
+		case T_PlaceHolderVar:
+		case T_AppendRelInfo:
+		case T_PlaceHolderInfo:
+			/* TODO: research whether those complex nodes are evaluatable. */
+			return true;
+		case T_Var:
+			{
+				Var *var = (Var *) node;
+				if (var->varno != context->foreignrel->relid || var->varlevelsup != 0)
+					return true;
+				else
+					return false;
+			}
+
+		default:
+			break;
 	}
-	if (nspname == NULL)
-		nspname = get_namespace_name(get_rel_namespace(table->relid));
-	if (relname == NULL)
-		relname = get_rel_name(table->relid);
-	nspname_q = quote_identifier(nspname);
-	relname_q = quote_identifier(relname);
-	appendStringInfo(sql, " FROM %s.%s %s", nspname_q, relname_q, aliasname_q);
-	pfree(nspname);
-	pfree(relname);
-	if (nspname_q != nspname)
-		pfree((char *) nspname_q);
-	if (relname_q != relname)
-		pfree((char * ) relname_q);
-	if (aliasname_q != aliasname)
-		pfree((char *) aliasname_q);
+
+	return expression_tree_walker(node, foreign_qual_walker, context);
+}
+
+
+/*
+ * Check whether the ExprState node can be evaluated in foreign server.
+ *
+ * An expression which consists of expressions below can be evaluated in
+ * the foreign server.
+ *  - constant value
+ *  - variable (foreign table column)
+ *  - external parameter (parameter of prepared statement)
+ *  - array
+ *  - bool expression (AND/OR/NOT)
+ *  - NULL test (IS [NOT] NULL)
+ *  - operator
+ *    - IMMUTABLE only
+ *    - It is required that the meaning of the operator be the same as the
+ *      local server in the foreign server. 
+ *  - function
+ *    - IMMUTABLE only
+ *    - It is required that the meaning of the operator be the same as the
+ *      local server in the foreign server. 
+ *  - scalar array operator (ANY/ALL)
+ */
+static bool
+is_foreign_qual(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
+{
+	remotely_executable_cxt context;
+
+	context.root = root;
+	context.foreignrel = baserel;
+	if (is_not_remotely_executable_walker((Node *) expr, &context))
+		return false;
+	if (contain_volatile_functions((Node *) expr))
+		return false;
+
+	return true;
 }
 
 /*
@@ -336,39 +386,58 @@ deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bo
  *     remote side and local side
  */
 static char *
-deparseSql(ForeignScanState *scanstate)
+deparseSql(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel)
 {
-	EState		   *estate = scanstate->ss.ps.state;
-	bool			prefix;
 	List		   *context;
 	StringInfoData	sql;
-	ForeignScan	   *scan;
-	RangeTblEntry  *rte;
-	ForeignTable   *table = scanstate->table;
+	ForeignTable   *table = GetForeignTable(foreigntableid);
+	ListCell   *lc;
+	bool		first;
+	char	   *nspname = NULL;
+	char	   *relname = NULL;
 
 	/* extract ForeignScan and RangeTblEntry */
-	scan = (ForeignScan *)scanstate->ss.ps.plan;
-	rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1);
 
 	/* prepare to deparse plan */
 	initStringInfo(&sql);
-	context = deparse_context_for_planstate((Node *)&scanstate->ss.ps, NULL,
-									   estate->es_range_table);
+
+	context = deparse_context_for("foreigntable", foreigntableid);
+
+	/* deparse SELECT target list */
+	appendStringInfoString(&sql, "SELECT ");
+	first = true;
+	foreach (lc, baserel->reltargetlist)
+	{
+		Var *var = lfirst(lc);
+
+		if (!first)
+			appendStringInfoString(&sql, ", ");
+		first = false;
+		deparse_var(root, &sql, var);
+	}
 
 	/*
-	 * XXX: Prefix is set to false always because setting prefix to true makes
-	 * the SQL invalid when this is a child scan for an inherited table and qual
-	 * is not empty (need to generate WHERE clause).  It might be needed to fix
-	 * deparse_expression() to deparse column references in the qual into
-	 * name of the child table, instead of name of the parent table, or table
-	 * alias.
+	 * Deparse FROM
+	 *
+	 * If the foreign table has generic option "nspname" and/or "relname", use
+	 * them in the foreign query.  Otherwise, use local catalog names.
 	 */
-	prefix = false;
-
-	/* deparse SELECT and FROM clauses */
-	deparseSelectClause(&sql, table, scanstate->ss.ss_currentRelation->rd_att,
-						rte->eref->aliasname, prefix);
-	deparseFromClause(&sql, table, rte->eref->aliasname, prefix);
+	foreach(lc, table->options)
+	{
+		DefElem *opt = lfirst(lc);
+		if (strcmp(opt->defname, "nspname") == 0)
+			nspname = pstrdup(strVal(opt->arg));
+		else if (strcmp(opt->defname, "relname") == 0)
+			relname = pstrdup(strVal(opt->arg));
+	}
+	if (nspname == NULL)
+		nspname = get_namespace_name(get_rel_namespace(foreigntableid));
+	if (relname == NULL)
+		relname = get_rel_name(foreigntableid);
+	appendStringInfo(&sql, " FROM %s.%s",
+					 quote_identifier(nspname), 
+					 quote_identifier(relname));
+	
 
 	/*
 	 * deparse WHERE cluase
@@ -381,10 +450,9 @@ deparseSql(ForeignScanState *scanstate)
 	 * statement to make following EXECUTE statements work properly.  The Plan
 	 * node is used repeatedly to create PlanState for each EXECUTE statement.
 	 */
-	if (scanstate->ss.ps.plan->qual)
+	if (baserel->baserestrictinfo)
 	{
 		List	   *local_qual = NIL;
-		List	   *foreign_qual = NIL;
 		List	   *foreign_expr = NIL;
 		ListCell   *lc;
 
@@ -392,23 +460,24 @@ deparseSql(ForeignScanState *scanstate)
 		 * Divide qual of PlanState into two lists, one for local evaluation
 		 * and one for foreign evaluation.
 		 */
-		foreach (lc, scanstate->ss.ps.qual)
+		foreach (lc, baserel->baserestrictinfo)
 		{
-			ExprState	   *state = lfirst(lc);
+			RestrictInfo *ri = (RestrictInfo *) lfirst(lc);
 
-			if (is_foreign_qual(state->expr))
-			{ foreign_qual = lappend(foreign_qual, state);
-				foreign_expr = lappend(foreign_expr, state->expr);
+			if (is_foreign_qual(root, baserel, ri->clause))
+			{
+				/* XXX: deparse and add to sql here */
+				foreign_expr = lappend(foreign_expr, ri->clause);
 			}
 			else
-				local_qual = lappend(local_qual, state);
+				local_qual = lappend(local_qual, ri);
 		}
 		/*
 		 * XXX: If the remote side is not reliable enough, we can keep the qual
 		 * in PlanState as is and evaluate them on local side too.  If so, just
 		 * omit replacement below.
 		 */
-		scanstate->ss.ps.qual = local_qual;
+		baserel->baserestrictinfo = local_qual;
 
 		/*
 		 * Deparse quals to be evaluated in the foreign server if any.
@@ -420,7 +489,7 @@ deparseSql(ForeignScanState *scanstate)
 			Node   *node;
 			node = (Node *) make_ands_explicit(foreign_expr);
 			appendStringInfo(&sql, " WHERE %s",
-				deparse_expression(node, context, prefix, false));
+				deparse_expression(node, context, false, false));
 			/*
 			 * The contents of the list MUST NOT be free-ed because they are
 			 * referenced from Plan.qual list.
@@ -435,39 +504,29 @@ deparseSql(ForeignScanState *scanstate)
 }
 
 /*
- * Deparse the request into SQL statement and keep it for future execution.
- *
- * XXX: deparsing should be done in pgEstimateCosts to estimate the costs by
- * executing EXPLAIN on remote side?
- */
-static void
-pgOpen(ForeignScanState *scanstate)
-{
-	pgFdwReply	   *reply;
-
-	elog(DEBUG3, "%s() called ", __FUNCTION__);
-
-	/* FWD-specific portion */
-	reply = (pgFdwReply *) palloc0(sizeof(*reply));
-	reply->sql = deparseSql(scanstate);
-	scanstate->reply = (FdwReply *) reply;
-}
-
-/*
  * Initiate actual scan on a foreign table.
  * This function is called just after pgOpen() if the ForeignScan was executed
  * for a real query or EXPLAIN statement with ANALYZE option.
  */
-static void
-pgBeginScan(ForeignScanState *scanstate)
+static FdwExecutionState *
+pgBeginScan(FdwPlan *plan, ParamListInfo params)
 {
-	pgFdwReply	   *reply = (pgFdwReply *) scanstate->reply;
-	PGconn		   *conn = (PGconn *) scanstate->conn;
-	PGresult	   *res;
-	ParamListInfo	info = scanstate->ss.ps.state->es_param_list_info;
-	int				numParams = info ? info->numParams : 0;
-	Oid			   *types = NULL;
-	const char	  **values = NULL;
+	FdwExecutionState *result = palloc(sizeof(FdwExecutionState));
+	pgFdwExecutionState *state;
+	PGconn	   *conn;
+	PGresult   *res;
+	int			numParams = params ? params->numParams : 0;
+	Oid		   *types = NULL;
+	const char **values = NULL;
+	const char *sql = (const char *) plan->fdw_private;
+
+	state = palloc(sizeof(pgFdwExecutionState));
+
+	conn = GetConnection(GetForeignServer(plan->serverid),
+						 GetUserMapping(plan->userid, plan->serverid));
+	state->conn = conn;	
+
+	result->fdw_private = state;
 
 	elog(DEBUG3, "%s() called", __FUNCTION__);
 
@@ -481,8 +540,8 @@ pgBeginScan(ForeignScanState *scanstate)
 		values = palloc0(sizeof(char *) * numParams);
 		for (i = 0; i < numParams; i++)
 		{
-			types[i] = info->params[i].ptype;
-			if (info->params[i].isnull)
+			types[i] = params->params[i].ptype;
+			if (params->params[i].isnull)
 				values[i] = NULL;
 			else
 			{
@@ -495,7 +554,7 @@ pgBeginScan(ForeignScanState *scanstate)
 				getTypeOutputInfo(types[i], &out_func_oid, &isvarlena);
 				fmgr_info(out_func_oid, &func);
 				values[i] =
-					OutputFunctionCall(&func, info->params[i].value);
+					OutputFunctionCall(&func, params->params[i].value);
 			}
 		}
 	}
@@ -505,8 +564,7 @@ pgBeginScan(ForeignScanState *scanstate)
 	 * TODO: support internal parameters(PARAM_EXTERN)
 	 * TODO: support cursor mode for huge result sets.
 	 */
-	res = PQexecParams(conn, reply->sql,
-							numParams, types, values, NULL, NULL, 0);
+	res = PQexecParams(conn, sql, numParams, types, values, NULL, NULL, 0);
 	if (numParams > 0)
 	{
 		int i;
@@ -525,31 +583,18 @@ pgBeginScan(ForeignScanState *scanstate)
 	{
 		char *msg;
 
-		PQclear(res);
 		msg = pstrdup(PQerrorMessage(conn));
-		ereport(ERROR, (
-				errmsg("could not execute foreign query"),
-				errdetail("%s", msg), errhint("%s", reply->sql)));
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("could not execute foreign query"),
+				 errdetail("%s", msg),
+				 errhint("%s", sql)));
 	}
 
-	/* Note: use PG_TRY to ensure freeing PGresult. */
-	PG_TRY();
-	{
-		TupleDesc	tupdesc = ExecGetScanType((ScanState *) scanstate);
-
-		/* create tuplestore to store results */
-		reply->tupstore = tuplestore_begin_heap(true, false, work_mem);
+	state->res = res;
+	state->nextrow = 0;
 
-		storeResult(reply->tupstore, false, tupdesc, res);
-
-		PQclear(res);
-	}
-	PG_CATCH();
-	{
-		PQclear(res);
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
+	return result;
 }
 
 /*
@@ -562,87 +607,48 @@ pgBeginScan(ForeignScanState *scanstate)
  * ScanState.
  */
 static void
-pgIterate(ForeignScanState *scanstate)
+pgIterate(FdwExecutionState *state, TupleTableSlot *slot)
 {
-	pgFdwReply	   *reply = (pgFdwReply *) scanstate->reply;
-	TupleTableSlot *slot = scanstate->ss.ss_ScanTupleSlot;
+	pgFdwExecutionState *priv = state->fdw_private;
 
 	elog(DEBUG3, "%s() called", __FUNCTION__);
 
-	/* store the next tuple into the slot from the tuplestore */
-	if (tuplestore_gettupleslot(reply->tupstore, true, false, slot))
-	{
-		/*
-		 * Because the tuples stored in the tupstore are minimal tuples,
-		 * they have to be materialized to retrieve system attributes.
-		 */
-		ExecMaterializeSlot(slot);
-	}
+	if (priv->nextrow == PQntuples(priv->res))
+		ExecClearTuple(slot);
 	else
-	{
-		/* TODO: if cursor mode, reset tuple slot and fetch the next batch. */
-	}
+		storeResult(slot, priv->res, priv->nextrow++);
 }
 
-/*
- * Finish scanning foreign table and dispose objects used for this scan.
- */
 static void
-pgClose(ForeignScanState *scanstate)
+pgReScan(FdwExecutionState *state)
 {
-	pgFdwReply	   *reply = (pgFdwReply *) scanstate->reply;
-
-	elog(DEBUG3, "%s() called", __FUNCTION__);
-
-	if (reply == NULL)
-		return;
-
-	if (reply->tupstore != NULL)
-		tuplestore_end(reply->tupstore);
-
-	/*
-	 * reply->conn is not freed here because foreign connections are
-	 * released by executor via FreeFSConnection.
-	 */
-	pfree(reply);
-	scanstate->reply = NULL;
+	((pgFdwExecutionState *)state->fdw_private)->nextrow = 0;
 }
 
-/*
- * Execute query with new parameter.
- */
 static void
-pgReOpen(ForeignScanState *scanstate)
+pgEndScan(FdwExecutionState *state)
 {
-	pgFdwReply	   *reply = (pgFdwReply *) scanstate->reply;
+	pgFdwExecutionState *priv = (pgFdwExecutionState *) state->fdw_private;
 
-	elog(DEBUG3, "%s() called", __FUNCTION__);
-
-	/* Rewind tuplestore to retrieve all tuples again */
-	if (reply->tupstore)
-		tuplestore_rescan(reply->tupstore);
+	PQclear(priv->res);
+	pfree(priv);
 }
 
 /*
  * Store a PGresult into tuplestore.
  */
 static void
-storeResult(Tuplestorestate *tupstore,
-			bool is_sql_cmd,
-			TupleDesc tupdesc,
-			PGresult *res)
+storeResult(TupleTableSlot *slot, PGresult *res, int rowno)
 {
 	int					i;
-	int					row;
-	int					ntuples;
 	int					nfields;
 	int					attnum;		/* number of non-dropped columns */
 	char			  **values;
 	AttInMetadata	   *attinmeta;
 	Form_pg_attribute  *attrs;
+	TupleDesc	tupdesc = slot->tts_tupleDescriptor;
 
-	ntuples = PQntuples(res);
-	nfields = is_sql_cmd ? 1 : PQnfields(res);
+	nfields = PQnfields(res);
 	attrs = tupdesc->attrs;
 
 	/* count non-dropped columns */
@@ -662,39 +668,29 @@ storeResult(Tuplestorestate *tupstore,
 
 	/* put all tuples into the tuplestore */
 	attinmeta = TupleDescGetAttInMetadata(tupdesc);
-	for (row = 0; row < ntuples; row++)
 	{
 		int			j;
 		HeapTuple	tuple;
 
-		CHECK_FOR_INTERRUPTS();
-
-		if (!is_sql_cmd)
+		for (i = 0, j = 0; i < tupdesc->natts; i++)
 		{
-			for (i = 0, j = 0; i < tupdesc->natts; i++)
+			/* skip dropped columns. */
+			if (attrs[i]->attisdropped)
 			{
-				/* skip dropped columns. */
-				if (attrs[i]->attisdropped)
-				{
-					values[i] = NULL;
-					continue;
-				}
-
-				if (PQgetisnull(res, row, j))
-					values[i] = NULL;
-				else
-					values[i] = PQgetvalue(res, row, j);
-				j++;
+				values[i] = NULL;
+				continue;
 			}
-		}
-		else
-		{
-			values[0] = PQcmdStatus(res);
+
+			if (PQgetisnull(res, rowno, j))
+				values[i] = NULL;
+			else
+				values[i] = PQgetvalue(res, rowno, j);
+			j++;
 		}
 
 		/* build the tuple and put it into the tuplestore. */
 		tuple = BuildTupleFromCStrings(attinmeta, values);
-		tuplestore_puttuple(tupstore, tuple);
+		ExecStoreTuple(tuple, slot, InvalidBuffer, true);
 	}
 
 	/* clean up and return the tuplestore */
@@ -702,6 +698,7 @@ storeResult(Tuplestorestate *tupstore,
 	pfree(values);
 }
 
+#ifdef NOT_USED
 /*
  * Retrieve cost-factors of the foreign server from catalog.
  */
@@ -740,7 +737,7 @@ get_server_costs(Oid relid, double *connection_cost, double *transfer_cost)
 	pfree(keywords);
 	pfree(values);
 }
-
+#endif
 
 /*
  * Estimate costs of scanning on a foreign table.
@@ -763,7 +760,12 @@ pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel)
 
 	/* Get cost factor from catalog to correct costs. */
 	rte = planner_rt_fetch(baserel->relid, root);
+#ifdef NOT_USED
 	get_server_costs(rte->relid, &connection_cost, &transfer_cost);
+#endif
+	/* XXX arbitrary guesses */
+	connection_cost = 10.0;
+	transfer_cost = 1.0;
 	path->path.startup_cost += connection_cost;
 	path->path.total_cost += connection_cost;
 	path->path.total_cost += transfer_cost *
@@ -904,6 +906,23 @@ check_conn_params(const char **keywords, const char **values)
 		   errdetail("Non-superusers must provide a password in the connection string.")));
 }
 
+static int
+flatten_generic_options(List *defelems, const char **keywords, const char **values)
+{
+	ListCell *lc;
+	int i;
+
+	i = 0;
+	foreach(lc, defelems)
+	{
+		DefElem *d = (DefElem *) lfirst(lc);
+		keywords[i] = d->defname;
+		values[i] = strVal(d->arg);
+		i++;
+	}
+	return i;
+}
+
 static PGconn *
 connect_pg_server(ForeignServer *server, UserMapping *user)
 {
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to