Le lundi 30 juin 2014 16:43:38 Michael Paquier a écrit :
> On Thu, Jun 19, 2014 at 11:00 PM, Michael Paquier <michael.paqu...@gmail.com
> > wrote:
> > >> This seems to be related to re-using the table-name between
> >
> > invocations. The
> >
> > >> attached patch should fix point 2. As for point 1, I don't know the
> >
> > cause for
> >
> > >> it. Do you have a reproducible test case ?
> > >
> > > Sure. I'll try harder when looking in more details at the patch for
> > > postgres_fdw :)
> >
> > With v2, I have tried randomly some of the scenarios of v1 plus some
> > extras, but was not able to reproduce it.
> >
> > > I'll look into the patch for postgres_fdw later.
> >
> > And here are some comments about it, when applied on top of the other 2
> > patches.
> > 1) Code compiles without warning, regression tests pass.
> > 2) In fetch_remote_tables, the palloc for the parameters should be
> > done after the number of parameters is determined. In the case of
> > IMPORT_ALL, some memory is wasted for nothing.
> > 3) Current code is not able to import default expressions for a table.
> > A little bit of processing with pg_get_expr would be necessary:
> > select pg_catalog.pg_get_expr(adbin, adrelid) from pg_attrdef;
> > There are of course bonus cases like SERIAL columns coming immediately
> > to my mind but it would be possible to determine if a given column is
> > serial with pg_get_serial_sequence.
> > This would be a good addition for the FDW IMO.
> > 4) The same applies of course to the constraint name: CREATE FOREIGN
> > TABLE foobar (a int CONSTRAINT toto NOT NULL) for example.
> > 5) A bonus idea: enable default expression obtention and null/not null
> > switch by default but add options to disable their respective
> > obtention.
> > 6) Defining once PGFDW_IMPORTRESULT_NUMCOLS at the top of
> > postgres_fdw.c without undefining would be perfectly fine.
> > 7) In postgresImportForeignSchema, the palloc calls and the
> > definitions of the variables used to save the results should be done
> > within the for loop.
> > 8) At quick glance, the logic of postgresImportForeignSchema looks
> > awkward... I'll have a second look with a fresher mind later on this
> > one.
>
> While having a second look at the core patch, I have found myself
> re-hacking it, fixing a couple of bugs and adding things that have been
> missing in the former implementation:
> - Deletions of unnecessary structures to simplify code and make it cleaner
> - Fixed a bug related to the management of local schema name. A FDW was
> free to set the schema name where local tables are created, this should not
> be the case.
> - Improved documentation, examples and other things, fixed doc padding for
> example
> - Added some missing stuff in equalfuncs.c and copyfuncs.c
> - Some other things.
> With that, core patch looks pretty nice actually, and I think that we
> should let a committer have a look at this part at least for this CF.
>
> Also, the postgres_fdw portion has been updated based on the previous core
> patch modified, using a version that Ronan sent me, which has addressed the
> remarks I sent before. This patch is still lacking documentation, some
> cleanup, and regression tests are broken, but it can be used to test the
> core feature. I unfortunately don't have much time today but I am sending
> this patch either way to let people play with IMPORT SCHEMA if I don't come
> back to it before.

The regression tests fail because of a typo in pg_type.h: BOOLARRAYOID should
be defined to 1000, not 1003 (which clashes against NAMEARRAYOID).

What do you think should be documented, and where ?


> Regards,

--
Ronan Dunklau
http://dalibo.com - http://dalibo.org
From 1cc2922e9087f2d852d2b1196314d7e35dce42a3 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Mon, 30 Jun 2014 16:36:47 +0900
Subject: [PATCH 2/2] Add support of IMPORT SCHEMA for postgres_fdw

---
 contrib/postgres_fdw/expected/postgres_fdw.out |  62 ++++++
 contrib/postgres_fdw/postgres_fdw.c            | 259 +++++++++++++++++++++++++
 contrib/postgres_fdw/sql/postgres_fdw.sql      |  30 +++
 src/include/catalog/pg_type.h                  |   2 +
 4 files changed, 353 insertions(+)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2e49ee3..07e6c11 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2834,3 +2834,65 @@ NOTICE:  NEW: (13,"test triggered !")
  (0,27)
 (1 row)

+-- Test IMPORT FOREIGN SCHEMA statement
+CREATE schema import_destination;
+CREATE schema import_source;
+CREATE TABLE import_source.t1 (c1 int, c2 varchar NOT NULL);
+CREATE TABLE import_source.t2 (c1 int, c2 varchar);
+CREATE TABLE import_source.t3 (c1 int, c2 varchar);
+IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_destination;
+\det+ import_destination;
+                                        List of foreign tables
+       Schema       | Table |  Server  |                  FDW Options                   | Description
+--------------------+-------+----------+------------------------------------------------+-------------
+ import_destination | t1    | loopback | (schema_name 'import_source', table_name 't1') |
+ import_destination | t2    | loopback | (schema_name 'import_source', table_name 't2') |
+ import_destination | t3    | loopback | (schema_name 'import_source', table_name 't3') |
+(3 rows)
+
+\d import_destination.t1;
+        Foreign table "import_destination.t1"
+ Column |       Type        | Modifiers | FDW Options
+--------+-------------------+-----------+-------------
+ c1     | integer           |           |
+ c2     | character varying | not null  |
+Server: loopback
+FDW Options: (schema_name 'import_source', table_name 't1')
+
+DROP SCHEMA import_destination cascade;
+NOTICE:  drop cascades to 3 other objects
+DETAIL:  drop cascades to foreign table import_destination.t1
+drop cascades to foreign table import_destination.t2
+drop cascades to foreign table import_destination.t3
+CREATE schema import_destination;
+IMPORT FOREIGN SCHEMA import_source LIMIT TO (inexistent) FROM SERVER loopback INTO import_destination; -- ERROR
+ERROR:  Table inexistent could not be found on the remote server.
+IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1) FROM SERVER loopback INTO import_destination;
+\det+ import_destination;
+                                        List of foreign tables
+       Schema       | Table |  Server  |                  FDW Options                   | Description
+--------------------+-------+----------+------------------------------------------------+-------------
+ import_destination | t1    | loopback | (schema_name 'import_source', table_name 't1') |
+(1 row)
+
+IMPORT FOREIGN SCHEMA import_source EXCEPT (t1, t2) FROM SERVER loopback INTO import_destination;
+\det+ import_destination;
+                                        List of foreign tables
+       Schema       | Table |  Server  |                  FDW Options                   | Description
+--------------------+-------+----------+------------------------------------------------+-------------
+ import_destination | t1    | loopback | (schema_name 'import_source', table_name 't1') |
+ import_destination | t3    | loopback | (schema_name 'import_source', table_name 't3') |
+(2 rows)
+
+-- Test IMPORT FOREIGN SCHEMA with missing types
+CREATE DATABASE remote1;
+CREATE SERVER remote1 FOREIGN DATA WRAPPER postgres_fdw
+  OPTIONS (dbname 'remote1');
+\c remote1;
+CREATE TYPE typ1 AS (m1 int, m2 varchar);
+CREATE SCHEMA import_source;
+CREATE TABLE import_source.t1 (c1 int, c2 typ1);
+\c contrib_regression;
+CREATE USER MAPPING FOR CURRENT_USER SERVER remote1;
+IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1) FROM SERVER remote1 INTO import_destination;
+ERROR:  IMPORT of table t1 failed because of missing type defined on remote but not locally: type "public.typ1" does not exist
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 7dd43a9..df4a040 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -46,6 +46,9 @@ PG_MODULE_MAGIC;
 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
 #define DEFAULT_FDW_TUPLE_COST		0.01

+/* Number of columns in the query fetching remote tables during IMPORT */
+#define PGFDW_IMPORTRESULT_NUMCOLS 4
+
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
@@ -286,6 +289,11 @@ static void postgresExplainForeignModify(ModifyTableState *mtstate,
 static bool postgresAnalyzeForeignTable(Relation relation,
 							AcquireSampleRowsFunc *func,
 							BlockNumber *totalpages);
+static List *postgresImportForeignSchema(ForeignServer *server,
+							const char *remote_schema,
+							ImportForeignSchemaType import_type,
+							List *table_names,
+							List *options);

 /*
  * Helper functions
@@ -326,6 +334,10 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   List *retrieved_attrs,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
+static PGresult *fetch_remote_tables(PGconn *conn,
+					const char *remote_schema,
+					ImportForeignSchemaType import_type,
+					List *table_names);


 /*
@@ -363,6 +375,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	/* Support functions for ANALYZE */
 	routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;

+	/* Support functions for IMPORT FOREIGN SCHEMA */
+	routine->ImportForeignSchema = postgresImportForeignSchema;
+
 	PG_RETURN_POINTER(routine);
 }

@@ -2347,6 +2362,250 @@ postgresAnalyzeForeignTable(Relation relation,
 	return true;
 }

+static PGresult *
+fetch_remote_tables(PGconn *conn,
+					const char *remote_schema,
+					ImportForeignSchemaType import_type,
+					List *table_names)
+{
+	StringInfoData buf;
+	int			numparams;
+	const char **import_params;
+	const char *check_params[1] = {remote_schema};
+	PGresult   *res = NULL;
+
+	initStringInfo(&buf);
+
+
+	/* Check that the schema really exists */
+	appendStringInfo(&buf, "SELECT 1 FROM pg_namespace WHERE nspname = $1");
+	res = PQexecParams(conn, buf.data, 1, NULL, check_params, NULL, NULL, 0);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pgfdw_report_error(ERROR, res, conn, true, buf.data);
+	if (PQntuples(res) != 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
+				 errmsg("The schema %s does not exist on the server", check_params[0])));
+
+	/* Fetch all tables from this schema */
+	resetStringInfo(&buf);
+	appendStringInfo(&buf,
+					 "SELECT relname, "
+					 "array_agg(attname::name) as colnames, "
+					 "array_agg(atttypid::regtype) as coltypes, "
+					 "array_agg(atttypmod::int4) as coltypmod, "
+					 "array_agg(attnotnull::bool) as colnotnull "
+					 "FROM pg_class "
+	   "INNER JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid "
+		   "LEFT  JOIN pg_attribute ON pg_class.oid = pg_attribute.attrelid "
+		  "  AND pg_attribute.attnum >= 0 AND NOT pg_attribute.attisdropped "
+					 "WHERE relkind IN ('r', 'v', 'f', 'm') "
+					 "AND pg_namespace.nspname = $1 ");
+	if (import_type != FDW_IMPORT_SCHEMA_ALL)
+	{
+		/* Add conditions */
+		Oid			outfuncoid;
+		bool		isvarlena;
+		Datum	   *elems = palloc0(list_length(table_names) * sizeof(Datum));
+		ArrayType  *array;
+		int			i = 0;
+		ListCell   *lc;
+		FmgrInfo   *fmout = palloc0(sizeof(FmgrInfo));
+
+		/* Since the import is not IMPORT_ALL, we have two params */
+		import_params = palloc0(sizeof(char *) * 2);
+		getTypeOutputInfo(CSTRINGARRAYOID, &outfuncoid, &isvarlena);
+		fmgr_info(outfuncoid, fmout);
+		foreach(lc, table_names)
+		{
+			elems[i] = CStringGetDatum(((RangeVar *) lfirst(lc))->relname);
+			i++;
+		}
+		array = construct_array(elems, i, CSTRINGOID, -2, false, 'c');
+		import_params[1] = OutputFunctionCall(fmout, PointerGetDatum(array));
+		appendStringInfo(&buf, "AND ");
+		if (import_type == FDW_IMPORT_SCHEMA_EXCEPT)
+		{
+			appendStringInfo(&buf, "NOT ");
+		}
+		appendStringInfo(&buf, "pg_class.relname = ANY($2)");
+		numparams = 2;
+		pfree(fmout);
+		pfree(elems);
+	}
+	else
+	{
+		numparams = 1;
+		import_params = palloc0(sizeof(char *) * 2);
+	}
+	import_params[0] = strdup(remote_schema);
+	appendStringInfo(&buf, "GROUP BY pg_class.oid, pg_class.relname");
+	res = PQexecParams(conn, buf.data, numparams, NULL, import_params, NULL, NULL, 0);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pgfdw_report_error(ERROR, res, conn, true, buf.data);
+	pfree(import_params);
+	return res;
+}
+
+/*
+ * Map a remote schema to a local one.
+ */
+static List *
+postgresImportForeignSchema(ForeignServer *server,
+							const char *remote_schema,
+							ImportForeignSchemaType import_type,
+							List *table_names,
+							List *options)
+{
+	List	   *tables = NULL;
+	Oid			userid = GetUserId();
+	UserMapping *mapping = GetUserMapping(userid, server->serverid);
+	PGconn	   *conn = GetConnection(server, mapping, false);
+	PGresult   *res;
+	int			numrows,
+				i;
+
+	/* Initialize FmgrInfo for parsing arrays */
+	FmgrInfo   *fmgr_infos = palloc0(sizeof(FmgrInfo) * PGFDW_IMPORTRESULT_NUMCOLS);
+	Oid			typoid,
+			   *typioparam = palloc0(sizeof(Oid) * PGFDW_IMPORTRESULT_NUMCOLS);
+	ArrayIterator *arrays = palloc0(sizeof(ArrayIterator) * PGFDW_IMPORTRESULT_NUMCOLS);
+	Oid			column_types[PGFDW_IMPORTRESULT_NUMCOLS] +		{NAMEARRAYOID, REGTYPEARRAYOID, INT4ARRAYOID, BOOLARRAYOID};
+	Datum	   *array_item = palloc0(sizeof(Datum) * PGFDW_IMPORTRESULT_NUMCOLS);
+	bool	   *isnull = palloc0(sizeof(bool) * PGFDW_IMPORTRESULT_NUMCOLS);
+
+	/* Save the memory context to escape error context later */
+	MemoryContext oldcontext = CurrentMemoryContext;
+
+	for (i = 0; i < PGFDW_IMPORTRESULT_NUMCOLS; i++)
+	{
+		getTypeInputInfo(column_types[i], &typoid, &typioparam[i]);
+		fmgr_info(typoid, &fmgr_infos[i]);
+	}
+	res = fetch_remote_tables(conn, remote_schema, import_type, table_names);
+	numrows = PQntuples(res);
+
+	for (i = 0; i < numrows; i++)
+	{
+		CreateForeignTableStmt *stmt = makeNode(CreateForeignTableStmt);
+		char	   *tablename;
+		int			colindex;
+
+		tablename = strdup(PQgetvalue(res, i, 0));
+
+		/* setup the base relation information */
+		stmt->base.relation = makeRangeVar(NULL, tablename, 0);
+		stmt->servername = server->servername;
+
+		/*
+		 * It is the role of server to set the schema where tables
+		 * are imported.
+		 */
+		stmt->base.relation->schemaname = NULL;
+
+		/* Parse arrays of columns from the result */
+		for (colindex = 0; colindex < PGFDW_IMPORTRESULT_NUMCOLS; colindex++)
+		{
+			Datum		array_datum;
+
+			PG_TRY();
+			{
+				array_datum = InputFunctionCall(&fmgr_infos[colindex],
+											PQgetvalue(res, i, colindex + 1),
+												typioparam[colindex], -1);
+				arrays[colindex] = array_create_iterator(DatumGetArrayTypeP(array_datum), 0);
+			}
+			PG_CATCH();
+			{
+				StringInfoData buf;
+				MemoryContext currentcontext = MemoryContextSwitchTo(oldcontext);
+				ErrorData  *edata = CopyErrorData();
+
+				initStringInfo(&buf);
+				if (res)
+					PQclear(res);
+				/* Special case the REGYPEARRAY conversion */
+				if (column_types[colindex] == REGTYPEARRAYOID)
+					appendStringInfo(&buf, "IMPORT of table %s failed because of missing type "
+						   "defined on remote but not locally: ", tablename);
+
+				appendStringInfo(&buf, "%s", edata->message);
+				MemoryContextSwitchTo(currentcontext);
+				ereport(ERROR,
+						(errcode(ERRCODE_UNDEFINED_OBJECT),
+						 errmsg("%s", buf.data)));
+			}
+			PG_END_TRY();
+		}
+
+		/* Add the individual columns */
+		while (array_iterate(arrays[0], &array_item[0], &isnull[0]))
+		{
+			ColumnDef  *new_column = makeNode(ColumnDef);
+
+			new_column->colname = strdup(DatumGetCString(array_item[0]));
+			for (colindex = 1; colindex < PGFDW_IMPORTRESULT_NUMCOLS; colindex++)
+				array_iterate(arrays[colindex], &array_item[colindex], &isnull[colindex]);
+
+			new_column->typeName = makeTypeNameFromOid(DatumGetObjectId(array_item[1]),
+											   DatumGetInt32(array_item[2]));
+			new_column->is_not_null = DatumGetBool(array_item[3]);
+			stmt->base.tableElts = lappend(stmt->base.tableElts, new_column);
+		}
+
+		/*
+		 * Add schema_name and table_name options table_name is added to
+		 * survive a foreign table rename.
+		 */
+		stmt->options = lappend(stmt->options,
+			makeDefElem("schema_name", (Node *) makeString((char *) remote_schema)));
+		stmt->options = lappend(stmt->options,
+			makeDefElem("table_name", (Node *) makeString(tablename)));
+		tables = lappend(tables, stmt);
+	}
+	/* If we have a LIMIT TO clause, check that every table is present */
+	if (import_type == FDW_IMPORT_SCHEMA_LIMIT_TO)
+	{
+		ListCell   *lc1,
+				   *lc2;
+		char	   *looked_up,
+				   *compared_to;
+		bool		found;
+
+		foreach(lc1, table_names)
+		{
+			found = false;
+			looked_up = ((RangeVar *) lfirst(lc1))->relname;
+			foreach(lc2, tables)
+			{
+				compared_to = ((CreateForeignTableStmt *) lfirst(lc2))->base.relation->relname;
+				if (strcmp(compared_to, looked_up) == 0)
+				{
+					found = true;
+					break;
+				}
+			}
+			if (!found)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_FDW_TABLE_NOT_FOUND),
+						 errmsg("Table %s could not be found on the remote server.", looked_up)));
+			}
+		}
+	}
+	/* Cleanup */
+	PQclear(res);
+	ReleaseConnection(conn);
+	pfree(array_item);
+	pfree(fmgr_infos);
+	pfree(arrays);
+	pfree(typioparam);
+	pfree(isnull);
+
+	return tables;
+}
+
 /*
  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
  *
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 6187839..0062ff5 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -609,3 +609,33 @@ UPDATE rem1 SET f2 = 'testo';

 -- Test returning a system attribute
 INSERT INTO rem1(f2) VALUES ('test') RETURNING ctid;
+
+
+-- Test IMPORT FOREIGN SCHEMA statement
+CREATE schema import_destination;
+CREATE schema import_source;
+CREATE TABLE import_source.t1 (c1 int, c2 varchar NOT NULL);
+CREATE TABLE import_source.t2 (c1 int, c2 varchar);
+CREATE TABLE import_source.t3 (c1 int, c2 varchar);
+IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_destination;
+\det+ import_destination;
+\d import_destination.t1;
+DROP SCHEMA import_destination cascade;
+CREATE schema import_destination;
+IMPORT FOREIGN SCHEMA import_source LIMIT TO (inexistent) FROM SERVER loopback INTO import_destination; -- ERROR
+IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1) FROM SERVER loopback INTO import_destination;
+\det+ import_destination;
+IMPORT FOREIGN SCHEMA import_source EXCEPT (t1, t2) FROM SERVER loopback INTO import_destination;
+\det+ import_destination;
+
+-- Test IMPORT FOREIGN SCHEMA with missing types
+CREATE DATABASE remote1;
+CREATE SERVER remote1 FOREIGN DATA WRAPPER postgres_fdw
+  OPTIONS (dbname 'remote1');
+\c remote1;
+CREATE TYPE typ1 AS (m1 int, m2 varchar);
+CREATE SCHEMA import_source;
+CREATE TABLE import_source.t1 (c1 int, c2 typ1);
+\c contrib_regression;
+CREATE USER MAPPING FOR CURRENT_USER SERVER remote1;
+IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1) FROM SERVER remote1 INTO import_destination;
diff --git a/src/include/catalog/pg_type.h b/src/include/catalog/pg_type.h
index b7d9256..bd887b2 100644
--- a/src/include/catalog/pg_type.h
+++ b/src/include/catalog/pg_type.h
@@ -442,9 +442,11 @@ DESCR("network IP address/netmask, network address");

 /* OIDS 1000 - 1099 */
 DATA(insert OID = 1000 (  _bool		 PGNSP PGUID -1 f b A f t \054 0	16 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ ));
+#define BOOLARRAYOID 1000
 DATA(insert OID = 1001 (  _bytea	 PGNSP PGUID -1 f b A f t \054 0	17 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ ));
 DATA(insert OID = 1002 (  _char		 PGNSP PGUID -1 f b A f t \054 0	18 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ ));
 DATA(insert OID = 1003 (  _name		 PGNSP PGUID -1 f b A f t \054 0	19 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ ));
+#define NAMEARRAYOID 1003
 DATA(insert OID = 1005 (  _int2		 PGNSP PGUID -1 f b A f t \054 0	21 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ ));
 #define INT2ARRAYOID		1005
 DATA(insert OID = 1006 (  _int2vector PGNSP PGUID -1 f b A f t \054 0	22 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ ));
--
2.0.0

Attachment: signature.asc
Description: This is a digitally signed message part.

Reply via email to