Hi!
1. done
2. rename to pg_user_subscriptions
3. by pg_dump, i checked upgrade from 10 to 12devel, it's work fine
4. done
5. done
6. I took it from AlterSubscription_refresh, in that function no any free()
7. done
--------
Ефимкин Евгений
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3f2f674a1a..ff8a65a3e4 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -522,12 +522,8 @@
</para>
<para>
- To create a subscription, the user must be a superuser.
- </para>
-
- <para>
- The subscription apply process will run in the local database with the
- privileges of a superuser.
+ To add tables to a subscription, the user must have ownership rights on the
+ table.
</para>
<para>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 6dfb2e4d3e..f0a368f90c 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -24,6 +24,9 @@ PostgreSQL documentation
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUBLICATION [ WITH ( <replaceable class="parameter">refresh_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD TABLE <replaceable class="parameter">table_name</replaceable> [, ...]
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET TABLE <replaceable class="parameter">table_name</replaceable> [, ...]
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DROP TABLE <replaceable class="parameter">table_name</replaceable> [, ...]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
@@ -44,9 +47,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<para>
You must own the subscription to use <command>ALTER SUBSCRIPTION</command>.
To alter the owner, you must also be a direct or indirect member of the
- new owning role. The new owner has to be a superuser.
- (Currently, all subscription owners must be superusers, so the owner checks
- will be bypassed in practice. But this might change in the future.)
+ new owning role.
</para>
</refsect1>
@@ -137,6 +138,35 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>ADD TABLE <replaceable class="parameter">table_name</replaceable></literal></term>
+ <listitem>
+ <para>
+ The <literal>ADD TABLE</literal> clauses will add new table in subscription, table must be
+ present in publication.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>SET TABLE <replaceable class="parameter">table_name</replaceable></literal></term>
+ <listitem>
+ <para>
+ The <literal>SET TABLE</literal> clause will replace the list of tables in
+ the publication with the specified one.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>DROP TABLE <replaceable class="parameter">table_name</replaceable></literal></term>
+ <listitem>
+ <para>
+ The <literal>DROP TABLE</literal> clauses will remove table from subscription.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>ENABLE</literal></term>
<listitem>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 1a90c244fb..04af4e27c7 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -24,6 +24,7 @@ PostgreSQL documentation
CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
+ [ FOR TABLE <replaceable class="parameter">table_name</replaceable> [, ...]
[ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
</synopsis>
</refsynopsisdiv>
@@ -88,6 +89,16 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>FOR TABLE</literal></term>
+ <listitem>
+ <para>
+ Specifies a list of tables to add to the subscription. All tables listed in clause
+ must be present in publication.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
<listitem>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index f4d9e9daf7..6ec6b24eb1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -904,6 +904,27 @@ CREATE VIEW pg_stat_progress_vacuum AS
FROM pg_stat_get_progress_info('VACUUM') AS S
LEFT JOIN pg_database D ON S.datid = D.oid;
+CREATE VIEW pg_user_subscriptions AS
+ SELECT
+ S.oid,
+ S.subdbid,
+ S.subname AS subname,
+ CASE WHEN S.subowner = 0 THEN
+ 'public'
+ ELSE
+ A.rolname
+ END AS usename,
+ S.subenabled,
+ CASE WHEN (S.subowner <> 0 AND A.rolname = current_user)
+ OR (SELECT rolsuper FROM pg_authid WHERE rolname = current_user)
+ THEN S.subconninfo
+ ELSE NULL END AS subconninfo,
+ S.subslotname,
+ S.subsynccommit,
+ S.subpublications
+ FROM pg_subscription S
+ LEFT JOIN pg_authid A ON (A.oid = S.subowner);
+
CREATE VIEW pg_user_mappings AS
SELECT
U.oid AS umid,
@@ -936,7 +957,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are readable.
REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications)
+GRANT SELECT (tableoid, oid, subdbid, subname,
+ subowner, subenabled, subslotname, subpublications, subsynccommit)
ON pg_subscription TO public;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index a60a15193a..79e967f037 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -30,6 +30,7 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
@@ -322,6 +323,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
char originname[NAMEDATALEN];
bool create_slot;
List *publications;
+ AclResult aclresult;
+
+ /* must have CREATE privilege on database */
+ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_DATABASE,
+ get_database_name(MyDatabaseId));
/*
* Parse and check options.
@@ -342,11 +350,6 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
if (create_slot)
PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
- if (!superuser())
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- (errmsg("must be superuser to create subscriptions"))));
-
rel = table_open(SubscriptionRelationId, RowExclusiveLock);
/* Check if name is used */
@@ -375,6 +378,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
/* Check the connection info string. */
walrcv_check_conninfo(conninfo);
+ walrcv_connstr_check(conninfo);
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
@@ -411,6 +415,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
snprintf(originname, sizeof(originname), "pg_%u", subid);
replorigin_create(originname);
+
+ if (stmt->tables && !connect)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot create subscription with connect = false and FOR TABLE")));
+ }
/*
* Connect to remote side to execute requested commands and fetch table
* info.
@@ -423,6 +434,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
List *tables;
ListCell *lc;
char table_state;
+ List *tablesiods = NIL;
/* Try to connect to the publisher. */
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
@@ -438,25 +450,59 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
*/
table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ walrcv_security_check(wrconn);
/*
* Get the table list from publisher and build local table status
* info.
*/
tables = fetch_table_list(wrconn, publications);
- foreach(lc, tables)
- {
- RangeVar *rv = (RangeVar *) lfirst(lc);
- Oid relid;
-
- relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
- /* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
- rv->schemaname, rv->relname);
-
- AddSubscriptionRelState(subid, relid, table_state,
+ if (stmt->tables)
+ {
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, NoLock, true);
+ tablesiods = lappend_oid(tablesiods, relid);
+ }
+ foreach(lc, stmt->tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+ if (!list_member_oid(tablesiods, relid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("table \"%s.%s\" not preset in publication",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid))));
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
+ }
+ }
+ else
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+ table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ AddSubscriptionRelState(subid, relid, table_state,
InvalidXLogRecPtr);
- }
+ }
/*
* If requested, create permanent slot for the subscription. We
@@ -503,6 +549,242 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
return myself;
}
+static void
+AlterSubscription_set_table(Subscription *sub, List *tables, bool copy_data)
+{
+ char *err;
+ List *pubrel_names;
+ List *subrel_states;
+ Oid *subrel_local_oids;
+ Oid *pubrel_local_oids;
+ Oid *stmt_local_oids;
+ ListCell *lc;
+ int off;
+
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_table_list(wrconn, sub->publications);
+
+ /* We are done with the remote side, close connection. */
+ walrcv_disconnect(wrconn);
+
+ /* Get local table list. */
+ subrel_states = GetSubscriptionRelations(sub->oid);
+
+ /*
+ * Build qsorted array of local table oids for faster lookup. This can
+ * potentially contain all tables in the database so speed of lookup is
+ * important.
+ */
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ stmt_local_oids = palloc(list_length(tables) * sizeof(Oid));
+ off = 0;
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ stmt_local_oids[off++] = relid;
+ }
+ qsort(stmt_local_oids, list_length(tables),
+ sizeof(Oid), oid_cmp);
+
+ pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+ off = 0;
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ pubrel_local_oids[off++] = relid;
+ }
+ qsort(pubrel_local_oids, list_length(pubrel_names),
+ sizeof(Oid), oid_cmp);
+
+ /*
+ * Walk over the remote tables and try to match them to locally known
+ * tables. If the table is not known locally create a new state for it.
+ *
+ * Also builds array of local oids of remote tables for the next step.
+ */
+
+
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp) &&
+ bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg("table \"%s.%s\" added to subscription \"%s\"",
+ rv->schemaname, rv->relname, sub->name)));
+ }
+ }
+
+ /*
+ * Next remove state for tables we should not care about anymore using the
+ * data we collected above
+ */
+
+ for (off = 0; off < list_length(subrel_states); off++)
+ {
+ Oid relid = subrel_local_oids[off];
+
+ if (!bsearch(&relid, stmt_local_oids,
+ list_length(tables), sizeof(Oid), oid_cmp))
+ {
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ logicalrep_worker_stop_at_commit(sub->oid, relid);
+
+ ereport(DEBUG1,
+ (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name)));
+ }
+ }
+}
+
+static void
+AlterSubscription_drop_table(Subscription *sub, List *tables)
+{
+ List *subrel_states;
+ Oid *subrel_local_oids;
+ ListCell *lc;
+ int off;
+
+ Assert(list_length(tables) > 0);
+ subrel_states = GetSubscriptionRelations(sub->oid);
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp))
+ {
+ ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("table \"%s.%s\" does not present in subscription",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid))));
+ }
+ else
+ {
+ RemoveSubscriptionRel(sub->oid, relid);
+ logicalrep_worker_stop_at_commit(sub->oid, relid);
+ }
+
+ }
+}
+
+static void
+AlterSubscription_add_table(Subscription *sub, List *tables, bool copy_data)
+{
+ char *err;
+ List *pubrel_names;
+ ListCell *lc;
+ List *pubrels = NIL;
+
+ Assert(list_length(tables) > 0);
+
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_table_list(wrconn, sub->publications);
+ /* Get oids of rels in command */
+ foreach(lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, NoLock, true);
+ pubrels = lappend_oid(pubrels, relid);
+ }
+
+ /* We are done with the remote side, close connection. */
+ walrcv_disconnect(wrconn);
+
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+ char table_state;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+ if (!list_member_oid(pubrels, relid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("table \"%s.%s\" not preset in publication",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid))));
+ table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ AddSubscriptionRelState(sub->oid, relid,
+ table_state,
+ InvalidXLogRecPtr);
+ }
+}
+
static void
AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
@@ -568,6 +850,12 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
+ /* must be owner */
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+
+
pubrel_local_oids[off++] = relid;
if (!bsearch(&relid, subrel_local_oids,
@@ -625,6 +913,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
bool update_tuple = false;
Subscription *sub;
Form_pg_subscription form;
+ char *err = NULL;
rel = table_open(SubscriptionRelationId, RowExclusiveLock);
@@ -721,10 +1010,31 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
}
case ALTER_SUBSCRIPTION_CONNECTION:
- /* Load the library providing us libpq calls. */
- load_file("libpqwalreceiver", false);
- /* Check the connection info string. */
- walrcv_check_conninfo(stmt->conninfo);
+ {
+ /* Load the library providing us libpq calls. */
+ /* Check the connection info string. */
+ load_file("libpqwalreceiver", false);
+ walrcv_check_conninfo(stmt->conninfo);
+ if (sub->enabled)
+ {
+
+ wrconn = walrcv_connect(stmt->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+ PG_TRY();
+ {
+ walrcv_security_check(wrconn);
+ }
+ PG_CATCH();
+ {
+ /* Close the connection in case of failure. */
+ walrcv_disconnect(wrconn);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+ }
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(stmt->conninfo);
@@ -774,6 +1084,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
+
parse_subscription_options(stmt->options, NULL, NULL, NULL,
NULL, NULL, NULL, ©_data,
NULL, NULL);
@@ -782,7 +1093,56 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
break;
}
+ case ALTER_SUBSCRIPTION_ADD_TABLE:
+ {
+ bool copy_data;
+
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not allowed for disabled subscriptions")));
+ parse_subscription_options(stmt->options, NULL, NULL, NULL,
+ NULL, NULL, NULL, ©_data,
+ NULL, NULL);
+
+ AlterSubscription_add_table(sub, stmt->tables, copy_data);
+
+ break;
+ }
+ case ALTER_SUBSCRIPTION_DROP_TABLE:
+ {
+
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for disabled subscriptions")));
+
+
+ parse_subscription_options(stmt->options, NULL, NULL, NULL,
+ NULL, NULL, NULL, NULL,
+ NULL, NULL);
+
+ AlterSubscription_drop_table(sub, stmt->tables);
+
+ break;
+ }
+ case ALTER_SUBSCRIPTION_SET_TABLE:
+ {
+ bool copy_data;
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for disabled subscriptions")));
+
+ parse_subscription_options(stmt->options, NULL, NULL, NULL,
+ NULL, NULL, NULL, ©_data,
+ NULL, NULL);
+
+ AlterSubscription_set_table(sub, stmt->tables, copy_data);
+
+ break;
+ }
default:
elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
stmt->kind);
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 3eb7e95d64..dad2528350 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4612,7 +4612,7 @@ _copyCreateSubscriptionStmt(const CreateSubscriptionStmt *from)
COPY_STRING_FIELD(conninfo);
COPY_NODE_FIELD(publication);
COPY_NODE_FIELD(options);
-
+ COPY_NODE_FIELD(tables);
return newnode;
}
@@ -4625,6 +4625,7 @@ _copyAlterSubscriptionStmt(const AlterSubscriptionStmt *from)
COPY_STRING_FIELD(subname);
COPY_STRING_FIELD(conninfo);
COPY_NODE_FIELD(publication);
+ COPY_NODE_FIELD(tables);
COPY_NODE_FIELD(options);
return newnode;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 5c4fa7d077..8724feaa67 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2239,6 +2239,7 @@ _equalCreateSubscriptionStmt(const CreateSubscriptionStmt *a,
COMPARE_STRING_FIELD(conninfo);
COMPARE_NODE_FIELD(publication);
COMPARE_NODE_FIELD(options);
+ COMPARE_NODE_FIELD(tables);
return true;
}
@@ -2251,6 +2252,7 @@ _equalAlterSubscriptionStmt(const AlterSubscriptionStmt *a,
COMPARE_STRING_FIELD(subname);
COMPARE_STRING_FIELD(conninfo);
COMPARE_NODE_FIELD(publication);
+ COMPARE_NODE_FIELD(tables);
COMPARE_NODE_FIELD(options);
return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c1faf4152c..2e484bb44a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -395,7 +395,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
execute_param_clause using_clause returning_clause
opt_enum_val_list enum_val_list table_func_column_list
create_generic_options alter_generic_options
- relation_expr_list dostmt_opt_list
+ relation_expr_list remote_relation_expr_list dostmt_opt_list
transform_element_list transform_type_list
TriggerTransitions TriggerReferencing
publication_name_list
@@ -405,6 +405,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause
%type <node> grouping_sets_clause
%type <node> opt_publication_for_tables publication_for_tables
+%type <node> opt_subscription_for_tables subscription_for_tables
%type <value> publication_name_item
%type <list> opt_fdw_options fdw_options
@@ -489,6 +490,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <node> table_ref
%type <jexpr> joined_table
%type <range> relation_expr
+%type <range> remote_relation_expr
%type <range> relation_expr_opt_alias
%type <node> tablesample_clause opt_repeatable_clause
%type <target> target_el set_target insert_column_item
@@ -9517,17 +9519,33 @@ AlterPublicationStmt:
*****************************************************************************/
CreateSubscriptionStmt:
- CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition
+ CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_subscription_for_tables opt_definition
{
CreateSubscriptionStmt *n =
makeNode(CreateSubscriptionStmt);
n->subname = $3;
n->conninfo = $5;
n->publication = $7;
- n->options = $8;
+ if ($8 != NULL)
+ {
+ /* FOR TABLE */
+ n->tables = (List *)$8;
+ }
+ n->options = $9;
$$ = (Node *)n;
}
;
+opt_subscription_for_tables:
+ subscription_for_tables { $$ = $1; }
+ | /* EMPTY */ { $$ = NULL; }
+ ;
+
+subscription_for_tables:
+ FOR TABLE remote_relation_expr_list
+ {
+ $$ = (Node *) $3;
+ }
+ ;
publication_name_list:
publication_name_item
@@ -9607,6 +9625,37 @@ AlterSubscriptionStmt:
(Node *)makeInteger(false), @1));
$$ = (Node *)n;
}
+ | ALTER SUBSCRIPTION name ADD_P TABLE remote_relation_expr_list opt_definition
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_ADD_TABLE;
+ n->subname = $3;
+ n->tables = $6;
+ n->options = $7;
+ n->tableAction = DEFELEM_ADD;
+ $$ = (Node *)n;
+ }
+ | ALTER SUBSCRIPTION name DROP TABLE remote_relation_expr_list
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_DROP_TABLE;
+ n->subname = $3;
+ n->tables = $6;
+ n->tableAction = DEFELEM_DROP;
+ $$ = (Node *)n;
+ }
+ | ALTER SUBSCRIPTION name SET TABLE remote_relation_expr_list
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_SET_TABLE;
+ n->subname = $3;
+ n->tables = $6;
+ n->tableAction = DEFELEM_SET;
+ $$ = (Node *)n;
+ }
;
/*****************************************************************************
@@ -12046,6 +12095,23 @@ relation_expr_list:
| relation_expr_list ',' relation_expr { $$ = lappend($1, $3); }
;
+remote_relation_expr:
+ qualified_name
+ {
+ /* no inheritance */
+ $$ = $1;
+ $$->inh = false;
+ $$->alias = NULL;
+ }
+ ;
+
+
+remote_relation_expr_list:
+ remote_relation_expr { $$ = list_make1($1); }
+ | remote_relation_expr_list ',' remote_relation_expr { $$ = lappend($1, $3); }
+ ;
+
+
/*
* Given "UPDATE foo set set ...", we have to decide without looking any
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7027737e67..71e1f0838e 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -52,6 +52,9 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo,
bool logical, const char *appname,
char **err);
static void libpqrcv_check_conninfo(const char *conninfo);
+static void libpqrcv_connstr_check(const char *connstr);
+static void libpqrcv_security_check(WalReceiverConn *conn);
+
static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
char **sender_host, int *sender_port);
@@ -83,6 +86,8 @@ static void libpqrcv_disconnect(WalReceiverConn *conn);
static WalReceiverFunctionsType PQWalReceiverFunctions = {
libpqrcv_connect,
libpqrcv_check_conninfo,
+ libpqrcv_connstr_check,
+ libpqrcv_security_check,
libpqrcv_get_conninfo,
libpqrcv_get_senderinfo,
libpqrcv_identify_system,
@@ -232,6 +237,54 @@ libpqrcv_check_conninfo(const char *conninfo)
PQconninfoFree(opts);
}
+static void
+libpqrcv_security_check(WalReceiverConn *conn)
+{
+ if (!superuser())
+ {
+ if (!PQconnectionUsedPassword(conn->streamConn))
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superuser cannot connect if the server does not request a password."),
+ errhint("Target server's authentication method must be changed.")));
+ }
+}
+
+static void
+libpqrcv_connstr_check(const char *connstr)
+{
+ if (!superuser())
+ {
+ PQconninfoOption *options;
+ PQconninfoOption *option;
+ bool connstr_gives_password = false;
+
+ options = PQconninfoParse(connstr, NULL);
+ if (options)
+ {
+ for (option = options; option->keyword != NULL; option++)
+ {
+ if (strcmp(option->keyword, "password") == 0)
+ {
+ if (option->val != NULL && option->val[0] != '\0')
+ {
+ connstr_gives_password = true;
+ break;
+ }
+ }
+ }
+ PQconninfoFree(options);
+ }
+
+ if (!connstr_gives_password)
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superusers must provide a password in the connection string.")));
+ }
+}
+
/*
* Return a user-displayable conninfo string. Any security-sensitive fields
* are obfuscated.
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 1d918d2c42..cabb4f4730 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -77,6 +77,28 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
}
}
+/*
+ * Relcache invalidation callback for all relation map cache.
+ */
+void
+logicalrep_relmap_invalidate_cb2(Datum arg, int cacheid, uint32 hashvalue)
+{
+ LogicalRepRelMapEntry *entry;
+ /* invalidate all cache entries */
+ if (LogicalRepRelMap == NULL)
+ return;
+ HASH_SEQ_STATUS status;
+ hash_seq_init(&status, LogicalRepRelMap);
+
+ while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+ {
+ entry->localreloid = InvalidOid;
+ entry->state = SUBREL_STATE_UNKNOWN;
+ }
+}
+
+
+
/*
* Initialize the relation map cache.
*/
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2c49c711e3..71e2607030 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1741,6 +1741,9 @@ ApplyWorkerMain(Datum main_arg)
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
invalidate_syncing_table_states,
(Datum) 0);
+ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+ logicalrep_relmap_invalidate_cb2,
+ (Datum) 0);
/* Build logical replication streaming options. */
options.logical = true;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 2b1a94733b..e9a3e43246 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4005,7 +4005,7 @@ getSubscriptions(Archive *fout)
if (dopt->no_subscriptions || fout->remoteVersion < 100000)
return;
- if (!is_superuser(fout))
+ if (!is_superuser(fout) && fout->remoteVersion < 120000)
{
int n;
@@ -4024,17 +4024,32 @@ getSubscriptions(Archive *fout)
query = createPQExpBuffer();
resetPQExpBuffer(query);
-
+ if (is_superuser(fout) && fout->remoteVersion < 120000)
+ {
+ appendPQExpBuffer(query,
+ "SELECT s.tableoid, s.oid, s.subname,"
+ "(%s s.subowner) AS rolname, "
+ " s.subconninfo, s.subslotname, s.subsynccommit, "
+ " s.subpublications "
+ "FROM pg_subscription s "
+ "WHERE s.subdbid = (SELECT oid FROM pg_database"
+ " WHERE datname = current_database())",
+ username_subquery);
+ }
+ else
+ {
+ appendPQExpBuffer(query,
+ "SELECT s.tableoid, s.oid, s.subname,"
+ "(%s s.subowner) AS rolname, "
+ " us.subconninfo, s.subslotname, s.subsynccommit, "
+ " s.subpublications "
+ "FROM pg_subscription s join pg_user_subscriptions us ON (s.oid=us.oid) "
+ "WHERE s.subdbid = (SELECT oid FROM pg_database"
+ " WHERE datname = current_database())",
+ username_subquery);
+ }
/* Get the subscriptions in current database. */
- appendPQExpBuffer(query,
- "SELECT s.tableoid, s.oid, s.subname,"
- "(%s s.subowner) AS rolname, "
- " s.subconninfo, s.subslotname, s.subsynccommit, "
- " s.subpublications "
- "FROM pg_subscription s "
- "WHERE s.subdbid = (SELECT oid FROM pg_database"
- " WHERE datname = current_database())",
- username_subquery);
+
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4ec8a83541..66f2401e85 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3478,6 +3478,7 @@ typedef struct CreateSubscriptionStmt
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
+ List *tables; /* Optional list of tables to add */
} CreateSubscriptionStmt;
typedef enum AlterSubscriptionType
@@ -3486,7 +3487,10 @@ typedef enum AlterSubscriptionType
ALTER_SUBSCRIPTION_CONNECTION,
ALTER_SUBSCRIPTION_PUBLICATION,
ALTER_SUBSCRIPTION_REFRESH,
- ALTER_SUBSCRIPTION_ENABLED
+ ALTER_SUBSCRIPTION_ENABLED,
+ ALTER_SUBSCRIPTION_DROP_TABLE,
+ ALTER_SUBSCRIPTION_ADD_TABLE,
+ ALTER_SUBSCRIPTION_SET_TABLE
} AlterSubscriptionType;
typedef struct AlterSubscriptionStmt
@@ -3497,6 +3501,9 @@ typedef struct AlterSubscriptionStmt
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
+ /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
+ List *tables; /* List of tables to add/drop */
+ DefElemAction tableAction; /* What action to perform with the tables */
} AlterSubscriptionStmt;
typedef struct DropSubscriptionStmt
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 85e0b6ea62..99bf9e8817 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -38,5 +38,7 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
extern char *logicalrep_typmap_gettypname(Oid remoteid);
+void logicalrep_relmap_invalidate_cb2(Datum arg, int cacheid,
+ uint32 hashvalue);
#endif /* LOGICALRELATION_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e04d725ff5..33658edecb 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -204,6 +204,8 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logica
const char *appname,
char **err);
typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
+typedef void (*walrcv_connstr_check_fn) (const char *connstr);
+typedef void (*walrcv_security_check_fn) (WalReceiverConn *conn);
typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
char **sender_host,
@@ -237,6 +239,8 @@ typedef struct WalReceiverFunctionsType
{
walrcv_connect_fn walrcv_connect;
walrcv_check_conninfo_fn walrcv_check_conninfo;
+ walrcv_connstr_check_fn walrcv_connstr_check;
+ walrcv_security_check_fn walrcv_security_check;
walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_get_senderinfo_fn walrcv_get_senderinfo;
walrcv_identify_system_fn walrcv_identify_system;
@@ -256,6 +260,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
#define walrcv_check_conninfo(conninfo) \
WalReceiverFunctions->walrcv_check_conninfo(conninfo)
+#define walrcv_connstr_check(connstr) \
+ WalReceiverFunctions->walrcv_connstr_check(connstr)
+#define walrcv_security_check(conn) \
+ WalReceiverFunctions->walrcv_security_check(conn)
#define walrcv_get_conninfo(conn) \
WalReceiverFunctions->walrcv_get_conninfo(conn)
#define walrcv_get_senderinfo(conn, sender_host, sender_port) \
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e384cd2279..5caa8f9cfa 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2241,6 +2241,25 @@ pg_user_mappings| SELECT u.oid AS umid,
FROM ((pg_user_mapping u
JOIN pg_foreign_server s ON ((u.umserver = s.oid)))
LEFT JOIN pg_authid a ON ((a.oid = u.umuser)));
+pg_user_subscriptions| SELECT s.oid,
+ s.subdbid,
+ s.subname,
+ CASE
+ WHEN (s.subowner = (0)::oid) THEN 'public'::name
+ ELSE a.rolname
+ END AS usename,
+ s.subenabled,
+ CASE
+ WHEN (((s.subowner <> (0)::oid) AND (a.rolname = CURRENT_USER)) OR ( SELECT pg_authid.rolsuper
+ FROM pg_authid
+ WHERE (pg_authid.rolname = CURRENT_USER))) THEN s.subconninfo
+ ELSE NULL::text
+ END AS subconninfo,
+ s.subslotname,
+ s.subsynccommit,
+ s.subpublications
+ FROM (pg_subscription s
+ LEFT JOIN pg_authid a ON ((a.oid = s.subowner)));
pg_views| SELECT n.nspname AS schemaname,
c.relname AS viewname,
pg_get_userbyid(c.relowner) AS viewowner,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 4fcbf7efe9..afc5177f10 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -40,11 +40,6 @@ SELECT obj_description(s.oid, 'pg_subscription') FROM pg_subscription s;
-- fail - name already exists
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false);
ERROR: subscription "testsub" already exists
--- fail - must be superuser
-SET SESSION AUTHORIZATION 'regress_subscription_user2';
-CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WITH (connect = false);
-ERROR: must be superuser to create subscriptions
-SET SESSION AUTHORIZATION 'regress_subscription_user';
-- fail - invalid option combinations
CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true);
ERROR: connect = false and copy_data = true are mutually exclusive options
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 36fa1bbac8..63eef1381e 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -33,11 +33,6 @@ SELECT obj_description(s.oid, 'pg_subscription') FROM pg_subscription s;
-- fail - name already exists
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false);
--- fail - must be superuser
-SET SESSION AUTHORIZATION 'regress_subscription_user2';
-CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WITH (connect = false);
-SET SESSION AUTHORIZATION 'regress_subscription_user';
-
-- fail - invalid option combinations
CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true);
CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true);
diff --git a/src/test/subscription/t/011_rep_changes_nonsuperuser.pl b/src/test/subscription/t/011_rep_changes_nonsuperuser.pl
new file mode 100644
index 0000000000..3acbb5663c
--- /dev/null
+++ b/src/test/subscription/t/011_rep_changes_nonsuperuser.pl
@@ -0,0 +1,316 @@
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More;
+
+if ($windows_os)
+{
+ plan skip_all => "authentication tests cannot run on Windows";
+}
+else
+{
+ plan tests => 18;
+}
+
+sub reset_pg_hba
+{
+ my $node = shift;
+ my $hba_method = shift;
+
+ unlink($node->data_dir . '/pg_hba.conf');
+ $node->append_conf('pg_hba.conf', "local all normal $hba_method");
+ $node->append_conf('pg_hba.conf', "local all all trust");
+ $node->reload;
+ return;
+}
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_subscriber->safe_psql('postgres',
+ "SET password_encryption='md5'; CREATE ROLE normal LOGIN PASSWORD 'pass';");
+$node_subscriber->safe_psql('postgres',
+ "GRANT CREATE ON DATABASE postgres TO normal;");
+$node_subscriber->safe_psql('postgres',
+ "ALTER ROLE normal WITH LOGIN;");
+reset_pg_hba($node_subscriber, 'trust');
+
+
+$node_publisher->safe_psql('postgres',
+ "SET password_encryption='md5'; CREATE ROLE normal LOGIN PASSWORD 'pass';");
+$node_publisher->safe_psql('postgres',
+ "ALTER ROLE normal WITH LOGIN; ALTER ROLE normal WITH SUPERUSER");
+reset_pg_hba($node_publisher, 'md5');
+
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_mixed (a int primary key, b text)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))"
+);
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)", extra_params => [ '-U', 'normal' ]);
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)", extra_params => [ '-U', 'normal' ]);
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)", extra_params => [ '-U', 'normal' ]);
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)", extra_params => [ '-U', 'normal' ]);
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)", extra_params => [ '-U', 'normal' ]);
+
+# different column count and order than on publisher
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_mixed (c text, b text, a int primary key)", extra_params => [ '-U', 'normal' ]);
+
+# replication of the table with included index
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))"
+, extra_params => [ '-U', 'normal' ]);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_notrep"
+);
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr password=pass user=normal application_name=$appname'
+ PUBLICATION tap_pub, tap_pub_ins_only
+ FOR TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_ins",
+ extra_params => [ '-U', 'normal' ]);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
+is($result, qq(0), 'check non-replicated table is empty on subscriber');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_ins SELECT generate_series(1,50)");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
+$node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_rep SELECT generate_series(1,50)");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20");
+$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_mixed VALUES (2, 'bar')");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_include SELECT generate_series(1,50)");
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab_include WHERE a > 20");
+$node_publisher->safe_psql('postgres', "UPDATE tab_include SET a = -a");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_rep");
+is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT c, b, a FROM tab_mixed");
+is( $result, qq(|foo|1
+|bar|2), 'check replicated changes with different column order');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_include");
+is($result, qq(20|-20|-1),
+ 'check replicated changes with primary key index with included columns');
+
+# insert some duplicate rows
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_full SELECT generate_series(1,10)");
+
+# add REPLICA IDENTITY FULL so we can update
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_full REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_full REPLICA IDENTITY FULL");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_full2 REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_full2 REPLICA IDENTITY FULL");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_ins REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_ins REPLICA IDENTITY FULL");
+
+# and do the updates
+$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_full");
+is($result, qq(20|1|100),
+ 'update works with REPLICA IDENTITY FULL and duplicate tuples');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT x FROM tab_full2 ORDER BY 1");
+is( $result, qq(a
+bb
+bb),
+ 'update works with REPLICA IDENTITY FULL and text datums');
+
+# check that change of connection string and/or publication list causes
+# restart of subscription workers. Not all of these are registered as tests
+# as we need to poll for a change but the test suite will fail none the less
+# when something goes wrong.
+my $oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';"
+);
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub CONNECTION 'application_name=$appname $publisher_connstr'"
+);
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';"
+) or die "Timed out while waiting for apply to restart";
+
+$oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';"
+);
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only WITH (copy_data = false)"
+);
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';"
+) or die "Timed out while waiting for apply to restart";
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_ins SELECT generate_series(1001,1100)");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
+
+# Restart the publisher and check the state of the subscriber which
+# should be in a streaming state after catching up.
+$node_publisher->stop('fast');
+$node_publisher->start;
+
+$node_publisher->wait_for_catchup($appname);
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(1152|1|1100),
+ 'check replicated inserts after subscription publication change');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_rep");
+is($result, qq(20|-20|-1),
+ 'check changes skipped after subscription publication change');
+
+# check alter publication (relcache invalidation etc)
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub ADD TABLE tab_full WITH (copy_data = false)");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)");
+
+$node_publisher->wait_for_catchup($appname);
+
+# note that data are different on provider and subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(1052|1|1002),
+ 'check replicated deletes after alter publication');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_full");
+is($result, qq(21|0|100), 'check replicated insert after alter publication');
+
+# check drop table from subscription
+$result = $node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub DROP TABLE tab_full");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(-1)");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_full");
+is($result, qq(21|0|100), 'check replicated insert after alter publication');
+
+# check restart on rename
+$oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';"
+);
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed");
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';"
+) or die "Timed out while waiting for apply to restart";
+
+# check all the cleanup
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+ 'check subscription relation status was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');