Le lundi 30 juin 2014 16:43:38 Michael Paquier a écrit : > On Thu, Jun 19, 2014 at 11:00 PM, Michael Paquier <michael.paqu...@gmail.com > > wrote: > > >> This seems to be related to re-using the table-name between > > > > invocations. The > > > > >> attached patch should fix point 2. As for point 1, I don't know the > > > > cause for > > > > >> it. Do you have a reproducible test case ? > > > > > > Sure. I'll try harder when looking in more details at the patch for > > > postgres_fdw :) > > > > With v2, I have tried randomly some of the scenarios of v1 plus some > > extras, but was not able to reproduce it. > > > > > I'll look into the patch for postgres_fdw later. > > > > And here are some comments about it, when applied on top of the other 2 > > patches. > > 1) Code compiles without warning, regression tests pass. > > 2) In fetch_remote_tables, the palloc for the parameters should be > > done after the number of parameters is determined. In the case of > > IMPORT_ALL, some memory is wasted for nothing. > > 3) Current code is not able to import default expressions for a table. > > A little bit of processing with pg_get_expr would be necessary: > > select pg_catalog.pg_get_expr(adbin, adrelid) from pg_attrdef; > > There are of course bonus cases like SERIAL columns coming immediately > > to my mind but it would be possible to determine if a given column is > > serial with pg_get_serial_sequence. > > This would be a good addition for the FDW IMO. > > 4) The same applies of course to the constraint name: CREATE FOREIGN > > TABLE foobar (a int CONSTRAINT toto NOT NULL) for example. > > 5) A bonus idea: enable default expression obtention and null/not null > > switch by default but add options to disable their respective > > obtention. > > 6) Defining once PGFDW_IMPORTRESULT_NUMCOLS at the top of > > postgres_fdw.c without undefining would be perfectly fine. > > 7) In postgresImportForeignSchema, the palloc calls and the > > definitions of the variables used to save the results should be done > > within the for loop. > > 8) At quick glance, the logic of postgresImportForeignSchema looks > > awkward... I'll have a second look with a fresher mind later on this > > one. > > While having a second look at the core patch, I have found myself > re-hacking it, fixing a couple of bugs and adding things that have been > missing in the former implementation: > - Deletions of unnecessary structures to simplify code and make it cleaner > - Fixed a bug related to the management of local schema name. A FDW was > free to set the schema name where local tables are created, this should not > be the case. > - Improved documentation, examples and other things, fixed doc padding for > example > - Added some missing stuff in equalfuncs.c and copyfuncs.c > - Some other things. > With that, core patch looks pretty nice actually, and I think that we > should let a committer have a look at this part at least for this CF. > > Also, the postgres_fdw portion has been updated based on the previous core > patch modified, using a version that Ronan sent me, which has addressed the > remarks I sent before. This patch is still lacking documentation, some > cleanup, and regression tests are broken, but it can be used to test the > core feature. I unfortunately don't have much time today but I am sending > this patch either way to let people play with IMPORT SCHEMA if I don't come > back to it before.
The regression tests fail because of a typo in pg_type.h: BOOLARRAYOID should be defined to 1000, not 1003 (which clashes against NAMEARRAYOID). What do you think should be documented, and where ? > Regards, -- Ronan Dunklau http://dalibo.com - http://dalibo.org
From 1cc2922e9087f2d852d2b1196314d7e35dce42a3 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@otacoo.com> Date: Mon, 30 Jun 2014 16:36:47 +0900 Subject: [PATCH 2/2] Add support of IMPORT SCHEMA for postgres_fdw --- contrib/postgres_fdw/expected/postgres_fdw.out | 62 ++++++ contrib/postgres_fdw/postgres_fdw.c | 259 +++++++++++++++++++++++++ contrib/postgres_fdw/sql/postgres_fdw.sql | 30 +++ src/include/catalog/pg_type.h | 2 + 4 files changed, 353 insertions(+) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 2e49ee3..07e6c11 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -2834,3 +2834,65 @@ NOTICE: NEW: (13,"test triggered !") (0,27) (1 row) +-- Test IMPORT FOREIGN SCHEMA statement +CREATE schema import_destination; +CREATE schema import_source; +CREATE TABLE import_source.t1 (c1 int, c2 varchar NOT NULL); +CREATE TABLE import_source.t2 (c1 int, c2 varchar); +CREATE TABLE import_source.t3 (c1 int, c2 varchar); +IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_destination; +\det+ import_destination; + List of foreign tables + Schema | Table | Server | FDW Options | Description +--------------------+-------+----------+------------------------------------------------+------------- + import_destination | t1 | loopback | (schema_name 'import_source', table_name 't1') | + import_destination | t2 | loopback | (schema_name 'import_source', table_name 't2') | + import_destination | t3 | loopback | (schema_name 'import_source', table_name 't3') | +(3 rows) + +\d import_destination.t1; + Foreign table "import_destination.t1" + Column | Type | Modifiers | FDW Options +--------+-------------------+-----------+------------- + c1 | integer | | + c2 | character varying | not null | +Server: loopback +FDW Options: (schema_name 'import_source', table_name 't1') + +DROP SCHEMA import_destination cascade; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to foreign table import_destination.t1 +drop cascades to foreign table import_destination.t2 +drop cascades to foreign table import_destination.t3 +CREATE schema import_destination; +IMPORT FOREIGN SCHEMA import_source LIMIT TO (inexistent) FROM SERVER loopback INTO import_destination; -- ERROR +ERROR: Table inexistent could not be found on the remote server. +IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1) FROM SERVER loopback INTO import_destination; +\det+ import_destination; + List of foreign tables + Schema | Table | Server | FDW Options | Description +--------------------+-------+----------+------------------------------------------------+------------- + import_destination | t1 | loopback | (schema_name 'import_source', table_name 't1') | +(1 row) + +IMPORT FOREIGN SCHEMA import_source EXCEPT (t1, t2) FROM SERVER loopback INTO import_destination; +\det+ import_destination; + List of foreign tables + Schema | Table | Server | FDW Options | Description +--------------------+-------+----------+------------------------------------------------+------------- + import_destination | t1 | loopback | (schema_name 'import_source', table_name 't1') | + import_destination | t3 | loopback | (schema_name 'import_source', table_name 't3') | +(2 rows) + +-- Test IMPORT FOREIGN SCHEMA with missing types +CREATE DATABASE remote1; +CREATE SERVER remote1 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname 'remote1'); +\c remote1; +CREATE TYPE typ1 AS (m1 int, m2 varchar); +CREATE SCHEMA import_source; +CREATE TABLE import_source.t1 (c1 int, c2 typ1); +\c contrib_regression; +CREATE USER MAPPING FOR CURRENT_USER SERVER remote1; +IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1) FROM SERVER remote1 INTO import_destination; +ERROR: IMPORT of table t1 failed because of missing type defined on remote but not locally: type "public.typ1" does not exist diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 7dd43a9..df4a040 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -46,6 +46,9 @@ PG_MODULE_MAGIC; /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */ #define DEFAULT_FDW_TUPLE_COST 0.01 +/* Number of columns in the query fetching remote tables during IMPORT */ +#define PGFDW_IMPORTRESULT_NUMCOLS 4 + /* * FDW-specific planner information kept in RelOptInfo.fdw_private for a * foreign table. This information is collected by postgresGetForeignRelSize. @@ -286,6 +289,11 @@ static void postgresExplainForeignModify(ModifyTableState *mtstate, static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); +static List *postgresImportForeignSchema(ForeignServer *server, + const char *remote_schema, + ImportForeignSchemaType import_type, + List *table_names, + List *options); /* * Helper functions @@ -326,6 +334,10 @@ static HeapTuple make_tuple_from_result_row(PGresult *res, List *retrieved_attrs, MemoryContext temp_context); static void conversion_error_callback(void *arg); +static PGresult *fetch_remote_tables(PGconn *conn, + const char *remote_schema, + ImportForeignSchemaType import_type, + List *table_names); /* @@ -363,6 +375,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; + /* Support functions for IMPORT FOREIGN SCHEMA */ + routine->ImportForeignSchema = postgresImportForeignSchema; + PG_RETURN_POINTER(routine); } @@ -2347,6 +2362,250 @@ postgresAnalyzeForeignTable(Relation relation, return true; } +static PGresult * +fetch_remote_tables(PGconn *conn, + const char *remote_schema, + ImportForeignSchemaType import_type, + List *table_names) +{ + StringInfoData buf; + int numparams; + const char **import_params; + const char *check_params[1] = {remote_schema}; + PGresult *res = NULL; + + initStringInfo(&buf); + + + /* Check that the schema really exists */ + appendStringInfo(&buf, "SELECT 1 FROM pg_namespace WHERE nspname = $1"); + res = PQexecParams(conn, buf.data, 1, NULL, check_params, NULL, NULL, 0); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, true, buf.data); + if (PQntuples(res) != 1) + ereport(ERROR, + (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), + errmsg("The schema %s does not exist on the server", check_params[0]))); + + /* Fetch all tables from this schema */ + resetStringInfo(&buf); + appendStringInfo(&buf, + "SELECT relname, " + "array_agg(attname::name) as colnames, " + "array_agg(atttypid::regtype) as coltypes, " + "array_agg(atttypmod::int4) as coltypmod, " + "array_agg(attnotnull::bool) as colnotnull " + "FROM pg_class " + "INNER JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid " + "LEFT JOIN pg_attribute ON pg_class.oid = pg_attribute.attrelid " + " AND pg_attribute.attnum >= 0 AND NOT pg_attribute.attisdropped " + "WHERE relkind IN ('r', 'v', 'f', 'm') " + "AND pg_namespace.nspname = $1 "); + if (import_type != FDW_IMPORT_SCHEMA_ALL) + { + /* Add conditions */ + Oid outfuncoid; + bool isvarlena; + Datum *elems = palloc0(list_length(table_names) * sizeof(Datum)); + ArrayType *array; + int i = 0; + ListCell *lc; + FmgrInfo *fmout = palloc0(sizeof(FmgrInfo)); + + /* Since the import is not IMPORT_ALL, we have two params */ + import_params = palloc0(sizeof(char *) * 2); + getTypeOutputInfo(CSTRINGARRAYOID, &outfuncoid, &isvarlena); + fmgr_info(outfuncoid, fmout); + foreach(lc, table_names) + { + elems[i] = CStringGetDatum(((RangeVar *) lfirst(lc))->relname); + i++; + } + array = construct_array(elems, i, CSTRINGOID, -2, false, 'c'); + import_params[1] = OutputFunctionCall(fmout, PointerGetDatum(array)); + appendStringInfo(&buf, "AND "); + if (import_type == FDW_IMPORT_SCHEMA_EXCEPT) + { + appendStringInfo(&buf, "NOT "); + } + appendStringInfo(&buf, "pg_class.relname = ANY($2)"); + numparams = 2; + pfree(fmout); + pfree(elems); + } + else + { + numparams = 1; + import_params = palloc0(sizeof(char *) * 2); + } + import_params[0] = strdup(remote_schema); + appendStringInfo(&buf, "GROUP BY pg_class.oid, pg_class.relname"); + res = PQexecParams(conn, buf.data, numparams, NULL, import_params, NULL, NULL, 0); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, true, buf.data); + pfree(import_params); + return res; +} + +/* + * Map a remote schema to a local one. + */ +static List * +postgresImportForeignSchema(ForeignServer *server, + const char *remote_schema, + ImportForeignSchemaType import_type, + List *table_names, + List *options) +{ + List *tables = NULL; + Oid userid = GetUserId(); + UserMapping *mapping = GetUserMapping(userid, server->serverid); + PGconn *conn = GetConnection(server, mapping, false); + PGresult *res; + int numrows, + i; + + /* Initialize FmgrInfo for parsing arrays */ + FmgrInfo *fmgr_infos = palloc0(sizeof(FmgrInfo) * PGFDW_IMPORTRESULT_NUMCOLS); + Oid typoid, + *typioparam = palloc0(sizeof(Oid) * PGFDW_IMPORTRESULT_NUMCOLS); + ArrayIterator *arrays = palloc0(sizeof(ArrayIterator) * PGFDW_IMPORTRESULT_NUMCOLS); + Oid column_types[PGFDW_IMPORTRESULT_NUMCOLS] + {NAMEARRAYOID, REGTYPEARRAYOID, INT4ARRAYOID, BOOLARRAYOID}; + Datum *array_item = palloc0(sizeof(Datum) * PGFDW_IMPORTRESULT_NUMCOLS); + bool *isnull = palloc0(sizeof(bool) * PGFDW_IMPORTRESULT_NUMCOLS); + + /* Save the memory context to escape error context later */ + MemoryContext oldcontext = CurrentMemoryContext; + + for (i = 0; i < PGFDW_IMPORTRESULT_NUMCOLS; i++) + { + getTypeInputInfo(column_types[i], &typoid, &typioparam[i]); + fmgr_info(typoid, &fmgr_infos[i]); + } + res = fetch_remote_tables(conn, remote_schema, import_type, table_names); + numrows = PQntuples(res); + + for (i = 0; i < numrows; i++) + { + CreateForeignTableStmt *stmt = makeNode(CreateForeignTableStmt); + char *tablename; + int colindex; + + tablename = strdup(PQgetvalue(res, i, 0)); + + /* setup the base relation information */ + stmt->base.relation = makeRangeVar(NULL, tablename, 0); + stmt->servername = server->servername; + + /* + * It is the role of server to set the schema where tables + * are imported. + */ + stmt->base.relation->schemaname = NULL; + + /* Parse arrays of columns from the result */ + for (colindex = 0; colindex < PGFDW_IMPORTRESULT_NUMCOLS; colindex++) + { + Datum array_datum; + + PG_TRY(); + { + array_datum = InputFunctionCall(&fmgr_infos[colindex], + PQgetvalue(res, i, colindex + 1), + typioparam[colindex], -1); + arrays[colindex] = array_create_iterator(DatumGetArrayTypeP(array_datum), 0); + } + PG_CATCH(); + { + StringInfoData buf; + MemoryContext currentcontext = MemoryContextSwitchTo(oldcontext); + ErrorData *edata = CopyErrorData(); + + initStringInfo(&buf); + if (res) + PQclear(res); + /* Special case the REGYPEARRAY conversion */ + if (column_types[colindex] == REGTYPEARRAYOID) + appendStringInfo(&buf, "IMPORT of table %s failed because of missing type " + "defined on remote but not locally: ", tablename); + + appendStringInfo(&buf, "%s", edata->message); + MemoryContextSwitchTo(currentcontext); + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("%s", buf.data))); + } + PG_END_TRY(); + } + + /* Add the individual columns */ + while (array_iterate(arrays[0], &array_item[0], &isnull[0])) + { + ColumnDef *new_column = makeNode(ColumnDef); + + new_column->colname = strdup(DatumGetCString(array_item[0])); + for (colindex = 1; colindex < PGFDW_IMPORTRESULT_NUMCOLS; colindex++) + array_iterate(arrays[colindex], &array_item[colindex], &isnull[colindex]); + + new_column->typeName = makeTypeNameFromOid(DatumGetObjectId(array_item[1]), + DatumGetInt32(array_item[2])); + new_column->is_not_null = DatumGetBool(array_item[3]); + stmt->base.tableElts = lappend(stmt->base.tableElts, new_column); + } + + /* + * Add schema_name and table_name options table_name is added to + * survive a foreign table rename. + */ + stmt->options = lappend(stmt->options, + makeDefElem("schema_name", (Node *) makeString((char *) remote_schema))); + stmt->options = lappend(stmt->options, + makeDefElem("table_name", (Node *) makeString(tablename))); + tables = lappend(tables, stmt); + } + /* If we have a LIMIT TO clause, check that every table is present */ + if (import_type == FDW_IMPORT_SCHEMA_LIMIT_TO) + { + ListCell *lc1, + *lc2; + char *looked_up, + *compared_to; + bool found; + + foreach(lc1, table_names) + { + found = false; + looked_up = ((RangeVar *) lfirst(lc1))->relname; + foreach(lc2, tables) + { + compared_to = ((CreateForeignTableStmt *) lfirst(lc2))->base.relation->relname; + if (strcmp(compared_to, looked_up) == 0) + { + found = true; + break; + } + } + if (!found) + { + ereport(ERROR, + (errcode(ERRCODE_FDW_TABLE_NOT_FOUND), + errmsg("Table %s could not be found on the remote server.", looked_up))); + } + } + } + /* Cleanup */ + PQclear(res); + ReleaseConnection(conn); + pfree(array_item); + pfree(fmgr_infos); + pfree(arrays); + pfree(typioparam); + pfree(isnull); + + return tables; +} + /* * Acquire a random sample of rows from foreign table managed by postgres_fdw. * diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 6187839..0062ff5 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -609,3 +609,33 @@ UPDATE rem1 SET f2 = 'testo'; -- Test returning a system attribute INSERT INTO rem1(f2) VALUES ('test') RETURNING ctid; + + +-- Test IMPORT FOREIGN SCHEMA statement +CREATE schema import_destination; +CREATE schema import_source; +CREATE TABLE import_source.t1 (c1 int, c2 varchar NOT NULL); +CREATE TABLE import_source.t2 (c1 int, c2 varchar); +CREATE TABLE import_source.t3 (c1 int, c2 varchar); +IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_destination; +\det+ import_destination; +\d import_destination.t1; +DROP SCHEMA import_destination cascade; +CREATE schema import_destination; +IMPORT FOREIGN SCHEMA import_source LIMIT TO (inexistent) FROM SERVER loopback INTO import_destination; -- ERROR +IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1) FROM SERVER loopback INTO import_destination; +\det+ import_destination; +IMPORT FOREIGN SCHEMA import_source EXCEPT (t1, t2) FROM SERVER loopback INTO import_destination; +\det+ import_destination; + +-- Test IMPORT FOREIGN SCHEMA with missing types +CREATE DATABASE remote1; +CREATE SERVER remote1 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname 'remote1'); +\c remote1; +CREATE TYPE typ1 AS (m1 int, m2 varchar); +CREATE SCHEMA import_source; +CREATE TABLE import_source.t1 (c1 int, c2 typ1); +\c contrib_regression; +CREATE USER MAPPING FOR CURRENT_USER SERVER remote1; +IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1) FROM SERVER remote1 INTO import_destination; diff --git a/src/include/catalog/pg_type.h b/src/include/catalog/pg_type.h index b7d9256..bd887b2 100644 --- a/src/include/catalog/pg_type.h +++ b/src/include/catalog/pg_type.h @@ -442,9 +442,11 @@ DESCR("network IP address/netmask, network address"); /* OIDS 1000 - 1099 */ DATA(insert OID = 1000 ( _bool PGNSP PGUID -1 f b A f t \054 0 16 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); +#define BOOLARRAYOID 1000 DATA(insert OID = 1001 ( _bytea PGNSP PGUID -1 f b A f t \054 0 17 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); DATA(insert OID = 1002 ( _char PGNSP PGUID -1 f b A f t \054 0 18 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); DATA(insert OID = 1003 ( _name PGNSP PGUID -1 f b A f t \054 0 19 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); +#define NAMEARRAYOID 1003 DATA(insert OID = 1005 ( _int2 PGNSP PGUID -1 f b A f t \054 0 21 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); #define INT2ARRAYOID 1005 DATA(insert OID = 1006 ( _int2vector PGNSP PGUID -1 f b A f t \054 0 22 0 array_in array_out array_recv array_send - - array_typanalyze i x f 0 -1 0 0 _null_ _null_ _null_ )); -- 2.0.0
signature.asc
Description: This is a digitally signed message part.