Hi, This feature adds schema option while creating publication. Users will be able to specify one or more schemas while creating publication, when the user specifies schema option, then the data changes for the tables present in the schema specified by the user will be replicated to the subscriber. Few examples have been listed below:
Create a publication that publishes all changes for all the tables present in production schema: CREATE PUBLICATION production_publication FOR ALL TABLES SCHEMA production; Create a publication that publishes all changes for all the tables present in marketing and sales schemas: CREATE PUBLICATION sales_publication FOR ALL TABLES SCHEMA marketing, sales; Add some schemas to the publication: ALTER PUBLICATION sales_publication ADD SCHEMA marketing_june, sales_june; Drop some schema from the publication: ALTER PUBLICATION production_quarterly_publication DROP SCHEMA production_july; Attached is a POC patch for the same. I felt this feature would be quite useful. Thoughts? Regards, Vignesh EnterpriseDB: http://www.enterprisedb.com
From f9b5134182229f718bbc1a9162b6043f879a6410 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Thu, 7 Jan 2021 11:38:17 +0530 Subject: [PATCH v1] Added schema level support for publication. This patch adds schema level support for publication along with for all tables. User can specify multiple schemas with schema option. When user specifies schema option, then the tables present in the schema specified will be selected by publisher for sending the data to subscriber. --- doc/src/sgml/ref/alter_publication.sgml | 32 ++++++ doc/src/sgml/ref/create_publication.sgml | 33 +++++- src/backend/catalog/pg_publication.c | 55 +++++++++- src/backend/commands/publicationcmds.c | 172 +++++++++++++++++++++++++++++- src/backend/parser/gram.y | 38 ++++++- src/bin/pg_dump/pg_dump.c | 22 +++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 6 +- src/include/catalog/pg_publication.h | 9 +- src/include/nodes/parsenodes.h | 2 + src/test/regress/expected/publication.out | 36 +++---- 11 files changed, 376 insertions(+), 30 deletions(-) diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index c2946df..c00b8da 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -24,6 +24,9 @@ PostgreSQL documentation ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable> @@ -97,6 +100,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r </varlistentry> <varlistentry> + <term><replaceable class="parameter">schema_name</replaceable></term> + <listitem> + <para> + Name of an existing schema. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term> <listitem> <para> @@ -141,6 +153,26 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete'); <programlisting> ALTER PUBLICATION mypublication ADD TABLE users, departments; </programlisting></para> + + <para> + Add some schemas to the publication: +<programlisting> +ALTER PUBLICATION sales_publication ADD SCHEMA marketing_june, sales_june; +</programlisting> + </para> + + <para> + Drop some schema from the publication: +<programlisting> +ALTER PUBLICATION production_quarterly_publication DROP SCHEMA production_july; +</programlisting> + </para> + + <para> + Set schema to the publication: +<programlisting> +ALTER PUBLICATION production_publication SET SCHEMA production_july; +</programlisting></para> </refsect1> <refsect1> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index ff82fbc..c941643 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -22,8 +22,8 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> CREATE PUBLICATION <replaceable class="parameter">name</replaceable> - [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] - | FOR ALL TABLES ] + [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ] + | FOR ALL TABLES [ SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ... ] ] ] [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] </synopsis> </refsynopsisdiv> @@ -100,6 +100,19 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> </varlistentry> <varlistentry> + <term><literal>SCHEMA</literal></term> + <listitem> + <para> + Specifies the list of schema that should be added to the publication. + If <literal>SCHEMA</literal> is specified, then the tables present in the + specified schema list is selected and added to the publication. The rest + of the tables will be skipped. This option should be specified + with <literal>FOR ALL TABLES</literal> option. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><literal>WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term> <listitem> <para> @@ -222,6 +235,22 @@ CREATE PUBLICATION alltables FOR ALL TABLES; <programlisting> CREATE PUBLICATION insert_only FOR TABLE mydata WITH (publish = 'insert'); +</programlisting> + </para> + + <para> + Create a publication that publishes all changes for all the tables present in +production schema: +<programlisting> +CREATE PUBLICATION production_publication FOR ALL TABLES schema production; +</programlisting> + </para> + + <para> + Create a publication that publishes all changes for all the tables present in +marketing and sales schemas: +<programlisting> +CREATE PUBLICATION sales_publication FOR ALL TABLES schema marketing, sales; </programlisting></para> </refsect1> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 5f8e1c6..9264c9d 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -349,13 +349,14 @@ GetAllTablesPublications(void) * root partitioned tables. */ List * -GetAllTablesPublicationRelations(bool pubviaroot) +GetAllTablesPublicationRelations(Publication *publication) { Relation classRel; ScanKeyData key[1]; TableScanDesc scan; HeapTuple tuple; List *result = NIL; + bool pubviaroot = publication->pubviaroot; classRel = table_open(RelationRelationId, AccessShareLock); @@ -371,6 +372,16 @@ GetAllTablesPublicationRelations(bool pubviaroot) Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; + /* + * If schema is specified by the user, check if the relation is present + * in one of the schema specified. + */ + if (publication->pubschemalist) + { + if (!list_member_oid(publication->pubschemalist, relForm->relnamespace)) + continue; + } + if (is_publishable_class(relid, relForm) && !(relForm->relispartition && pubviaroot)) result = lappend_oid(result, relid); @@ -405,6 +416,35 @@ GetAllTablesPublicationRelations(bool pubviaroot) } /* + * TextarrayToSchemaOidList + * + * Create schema oid List using schema Text Array. + */ +static List * +TextarrayToSchemaOidList(ArrayType *arr) +{ + Datum *elems; + bool *nulls; + int nelems; + List *list = NIL; + int i; + + deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT, + &elems, &nulls, &nelems); + + for (i = 0; i < nelems; i++) + { + if (nulls[i]) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("name or argument lists may not contain nulls"))); + list = lappend_oid(list, get_namespace_oid(TextDatumGetCString(elems[i]), false)); + } + + return list; +} + +/* * Get publication using oid * * The Publication struct and its data are palloc'ed here. @@ -415,6 +455,8 @@ GetPublication(Oid pubid) HeapTuple tup; Publication *pub; Form_pg_publication pubform; + Datum datum; + bool isnull; tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); if (!HeapTupleIsValid(tup)) @@ -432,6 +474,15 @@ GetPublication(Oid pubid) pub->pubactions.pubtruncate = pubform->pubtruncate; pub->pubviaroot = pubform->pubviaroot; + datum = SysCacheGetAttr(PUBLICATIONOID, + tup, + Anum_pg_publication_pubschema, + &isnull); + if (!isnull) + pub->pubschemalist = TextarrayToSchemaOidList(DatumGetArrayTypeP(datum)); + else + pub->pubschemalist = NIL; + ReleaseSysCache(tup); return pub; @@ -531,7 +582,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * need those. */ if (publication->alltables) - tables = GetAllTablesPublicationRelations(publication->pubviaroot); + tables = GetAllTablesPublicationRelations(publication); else tables = GetPublicationRelations(publication->oid, publication->pubviaroot ? diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 95c253c..ceb7c4c 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -142,6 +142,29 @@ parse_publication_options(List *options, } /* + * Create a text array of schemas using schema list. + */ +static Datum +SchemaListToArray(List *schemalist) +{ + ArrayType *arr; + Datum *datums; + int j = 0; + ListCell *cell; + + datums = (Datum *) palloc(sizeof(Datum) * list_length(schemalist)); + foreach(cell, schemalist) + { + char *name = strVal(lfirst(cell)); + datums[j++] = CStringGetTextDatum(name); + } + + arr = construct_array(datums, list_length(schemalist), + TEXTOID, -1, false, TYPALIGN_INT); + return PointerGetDatum(arr); +} + +/* * Create new publication. */ ObjectAddress @@ -171,6 +194,10 @@ CreatePublication(CreatePublicationStmt *stmt) (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES publication"))); + if (stmt->schemas && !stmt->for_all_tables) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("schema option cannot be used without specifying FOR ALL TABLES option"))); rel = table_open(PublicationRelationId, RowExclusiveLock); /* Check if name is used */ @@ -213,6 +240,23 @@ CreatePublication(CreatePublicationStmt *stmt) values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); + if (stmt->schemas && list_length(stmt->schemas)) + { + ListCell *cell; + foreach(cell, stmt->schemas) + { + char *schema = strVal(lfirst(cell)); + + /* Check if the schema specified by the user exists. */ + get_namespace_oid(schema, false); + } + + values[Anum_pg_publication_pubschema - 1] = + SchemaListToArray(stmt->schemas); + } + else + nulls[Anum_pg_publication_pubschema - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -428,10 +472,132 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, } /* + * Create a schema list using text array of schemas. + */ +static List * +TextArrayToSchemaList(ArrayType *arr) +{ + Datum *elems; + bool *nulls; + int nelems; + List *list = NIL; + int i; + + deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT, + &elems, &nulls, &nelems); + + for (i = 0; i < nelems; i++) + { + if (nulls[i]) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("name or argument lists may not contain nulls"))); + list = lappend(list, makeString(TextDatumGetCString(elems[i]))); + } + + return list; +} + +/* + * Alter the publication schemas. + * + * Add/Remove/Set the schemas to/from publication. + */ +static void +AlterPublicationSchemas(AlterPublicationStmt *stmt, Relation rel, + HeapTuple tup, Form_pg_publication pubform) +{ + List *pubschemalist = NIL; + ListCell *cell; + bool isnull; + Datum *values; + bool *nulls; + bool *replaces; + HeapTuple newtup; + + Datum datum = SysCacheGetAttr(PUBLICATIONNAME, tup, + Anum_pg_publication_pubschema, &isnull); + + if (!isnull) + pubschemalist = TextArrayToSchemaList(DatumGetArrayTypeP(datum)); + + values = palloc0(RelationGetNumberOfAttributes(rel) * sizeof(Datum)); + nulls = palloc0(RelationGetNumberOfAttributes(rel) * sizeof(bool)); + replaces = palloc0(RelationGetNumberOfAttributes(rel) * sizeof(bool)); + + /* Check if the schema specified by the user exists. */ + if (stmt->tableAction == DEFELEM_ADD || stmt->tableAction == DEFELEM_SET) + { + foreach(cell, stmt->schemas) + get_namespace_oid(strVal(lfirst(cell)), false); + } + + + if (stmt->tableAction == DEFELEM_ADD) + { + foreach(cell, stmt->schemas) + { + char *schema = strVal(lfirst(cell)); + + /* + * Check if the specified schema exists in the publisher schema + * list. + */ + if (!list_member(pubschemalist, makeString(schema))) + pubschemalist = lappend(pubschemalist, makeString(schema)); + else + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("schema \"%s\" already exist in the publisher schema list", schema))); + } + } + else if (stmt->tableAction == DEFELEM_DROP) + { + foreach(cell, stmt->schemas) + { + char *schema = strVal(lfirst(cell)); + + /* + * Check if the specified schema exists in the publisher schema + * list. + */ + if (list_member(pubschemalist, makeString(schema))) + pubschemalist = list_delete(pubschemalist, makeString(schema)); + else + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("schema \"%s\" does not exist in the publisher schema list", schema))); + } + } + else + pubschemalist = stmt->schemas; + + if (pubschemalist && list_length(pubschemalist)) + values[Anum_pg_publication_pubschema - 1] = + SchemaListToArray(pubschemalist); + else + nulls[Anum_pg_publication_pubschema - 1] = true; + + replaces[Anum_pg_publication_pubschema - 1] = true; + newtup = heap_modify_tuple(tup, RelationGetDescr(rel), + values, nulls, replaces); + CatalogTupleUpdate(rel, &tup->t_self, newtup); + + InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0); + + /* Release memory */ + pfree(values); + pfree(nulls); + pfree(replaces); + heap_freetuple(newtup); + return; +} + +/* * Alter the existing publication. * - * This is dispatcher function for AlterPublicationOptions and - * AlterPublicationTables. + * This is dispatcher function for AlterPublicationOptions, + * AlterPublicationSchemas and AlterPublicationTables. */ void AlterPublication(AlterPublicationStmt *stmt) @@ -460,6 +626,8 @@ AlterPublication(AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(stmt, rel, tup); + else if (stmt->schemas) + AlterPublicationSchemas(stmt, rel, tup, pubform); else AlterPublicationTables(stmt, rel, tup); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 31c9544..90d554e 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -420,7 +420,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <list> group_by_list %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause -%type <node> opt_publication_for_tables publication_for_tables +%type <node> opt_publication_for_tables publication_for_tables opt_schema_for_tables %type <list> opt_fdw_options fdw_options %type <defelt> fdw_option @@ -9462,6 +9462,10 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec } ; +opt_schema_for_tables: + SCHEMA name_list { $$ = (Node *) $2; } + | /* EMPTY */ { $$ = NULL; } + ; /***************************************************************************** * @@ -9470,7 +9474,7 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec *****************************************************************************/ CreatePublicationStmt: - CREATE PUBLICATION name opt_publication_for_tables opt_definition + CREATE PUBLICATION name opt_publication_for_tables opt_definition opt_schema_for_tables { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; @@ -9484,6 +9488,7 @@ CreatePublicationStmt: else n->for_all_tables = true; } + n->schemas = (List *)$6; $$ = (Node *)n; } ; @@ -9515,6 +9520,11 @@ publication_for_tables: * * ALTER PUBLICATION name SET TABLE table [, table2] * + * ALTER PUBLICATION name ADD SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name DROP SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name SET SCHEMA schema [, schema2] *****************************************************************************/ AlterPublicationStmt: @@ -9549,6 +9559,30 @@ AlterPublicationStmt: n->tableAction = DEFELEM_DROP; $$ = (Node *)n; } + | ALTER PUBLICATION name ADD_P SCHEMA name_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->tableAction = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER PUBLICATION name SET SCHEMA name_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->tableAction = DEFELEM_SET; + $$ = (Node *)n; + } + | ALTER PUBLICATION name DROP SCHEMA name_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->tableAction = DEFELEM_DROP; + $$ = (Node *)n; + } ; /***************************************************************************** diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 1f70653..bf8d534 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3881,6 +3881,7 @@ getPublications(Archive *fout) int i_pubdelete; int i_pubtruncate; int i_pubviaroot; + int i_pubschema; int i, ntups; @@ -3892,7 +3893,14 @@ getPublications(Archive *fout) resetPQExpBuffer(query); /* Get the publications. */ - if (fout->remoteVersion >= 130000) + if (fout->remoteVersion >= 140000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot, array_to_string(p.pubschema, ',') AS pubschema " + "FROM pg_publication p", + username_subquery); + else if (fout->remoteVersion >= 130000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " @@ -3928,6 +3936,7 @@ getPublications(Archive *fout) i_pubdelete = PQfnumber(res, "pubdelete"); i_pubtruncate = PQfnumber(res, "pubtruncate"); i_pubviaroot = PQfnumber(res, "pubviaroot"); + i_pubschema = PQfnumber(res, "pubschema"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -3952,6 +3961,10 @@ getPublications(Archive *fout) (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); pubinfo[i].pubviaroot = (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0); + if (!PQgetisnull(res, i, i_pubschema)) + pubinfo[i].pubschema = pg_strdup(PQgetvalue(res, i, i_pubschema)); + else + pubinfo[i].pubschema = NULL; if (strlen(pubinfo[i].rolname) == 0) pg_log_warning("owner of publication \"%s\" appears to be invalid", @@ -4033,7 +4046,12 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo) if (pubinfo->pubviaroot) appendPQExpBufferStr(query, ", publish_via_partition_root = true"); - appendPQExpBufferStr(query, ");\n"); + appendPQExpBufferStr(query, ")"); + + if (pubinfo->pubschema) + appendPQExpBuffer(query, " SCHEMA %s", pubinfo->pubschema); + + appendPQExpBufferStr(query, ";\n"); ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId, ARCHIVE_OPTS(.tag = pubinfo->dobj.name, diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index b3ce4ee..c02b912 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -606,6 +606,7 @@ typedef struct _PublicationInfo bool pubdelete; bool pubtruncate; bool pubviaroot; + char *pubschema; } PublicationInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index caf9756..0ab1a61 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5746,7 +5746,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -5781,6 +5781,10 @@ listPublications(const char *pattern) appendPQExpBuffer(&buf, ",\n pubviaroot AS \"%s\"", gettext_noop("Via root")); + if (pset.sversion >= 140000) + appendPQExpBuffer(&buf, + ",\n pubschema AS \"%s\"", + gettext_noop("Schemas")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 0dd50fe..7c93148 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -54,6 +54,10 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if partition changes are published using root schema */ bool pubviaroot; + +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + text pubschema[1] BKI_FORCE_NULL; /* schema names */ +#endif } FormData_pg_publication; /* ---------------- @@ -63,6 +67,8 @@ CATALOG(pg_publication,6104,PublicationRelationId) */ typedef FormData_pg_publication *Form_pg_publication; +DECLARE_TOAST(pg_publication, 13734, 13735); + DECLARE_UNIQUE_INDEX(pg_publication_oid_index, 6110, on pg_publication using btree(oid oid_ops)); #define PublicationObjectIndexId 6110 DECLARE_UNIQUE_INDEX(pg_publication_pubname_index, 6111, on pg_publication using btree(pubname name_ops)); @@ -83,6 +89,7 @@ typedef struct Publication bool alltables; bool pubviaroot; PublicationActions pubactions; + List *pubschemalist; } Publication; extern Publication *GetPublication(Oid pubid); @@ -107,7 +114,7 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); -extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetAllTablesPublicationRelations(Publication *publication); extern bool is_publishable_relation(Relation rel); extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index dc2bb40..0697533 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3523,6 +3523,7 @@ typedef struct CreatePublicationStmt List *options; /* List of DefElem nodes */ List *tables; /* Optional list of tables to add */ bool for_all_tables; /* Special publication for all tables in db */ + List *schemas; /* Options list of schemas */ } CreatePublicationStmt; typedef struct AlterPublicationStmt @@ -3537,6 +3538,7 @@ typedef struct AlterPublicationStmt List *tables; /* List of tables to add/drop */ bool for_all_tables; /* Special publication for all tables in db */ DefElemAction tableAction; /* What action to perform with the tables */ + List *schemas; /* Options list of schemas */ } AlterPublicationStmt; typedef struct CreateSubscriptionStmt diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 63d6ab7..420c8a2 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -28,20 +28,20 @@ ERROR: unrecognized "publish" value: "cluster" CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0'); ERROR: conflicting or redundant options \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | f | t | f | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Schemas +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | + testpub_default | regress_publication_user | f | f | t | f | f | f | (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Schemas +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | + testpub_default | regress_publication_user | f | t | t | t | f | f | (2 rows) --- adding tables @@ -271,20 +271,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root --------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpub_foo | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Schemas +-------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpub_foo | regress_publication_user | f | t | t | t | f | f | (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ------------------+---------------------------+------------+---------+---------+---------+-----------+---------- - testpub_default | regress_publication_user2 | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Schemas +-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpub_default | regress_publication_user2 | f | t | t | t | f | f | (1 row) DROP PUBLICATION testpub_default; -- 1.8.3.1