Hello!
New draft attached with filtering table in subscription (ADD/DROP) and allow
non-superusers use` CREATE SUBSCRIPTION` for own tables.
14.11.2018, 18:10, "Evgeniy Efimkin" <[email protected]>:
> Hello!
> I started work on patch (draft attached). Draft has changes related only to
> `CREATE SUBSCRIPTION`.
> I also introduce a new status (DEFFERED) for tables in `FOR TABLE` clause
> (but not in publication).
> New column in pg_subscription (suballtables) will be used in `REFRESH` clause
>
> 09.11.2018, 15:24, "Evgeniy Efimkin" <[email protected]>:
>> Hi!
>> In order to support create subscription from non-superuser, we need to make
>> it possible to choose tables on the subscriber side:
>> 1. add `FOR TABLE` clause in `CREATE SUBSCRIPTION`:
>> ```
>> CREATE SUBSCRIPTION subscription_name
>> CONNECTION 'conninfo'
>> PUBLICATION publication_name [, ...]
>> [ FOR TABLE [ ONLY ] table_name [ * ] [, ...]| FOR ALL TABLES ]
>> [ WITH ( subscription_parameter [= value] [, ... ] ) ]
>> ```
>> ... where `FOR ALL TABLES` is only allowed for superuser.
>> and table list in `FOR TABLES` clause will be stored in
>> pg_subscription_rel table (maybe another place?)
>>
>> 2. Each subscription should have "all tables" attribute.
>> For example via a new column in pg_subscription "suballtables".
>>
>> 3. Add `ALTER SUBSCRIPTION (ADD TABLE | DROP TABLE)`:
>> ```
>> ALTER SUBSCRIPTION subscription_name ADD TABLE [ ONLY ] table_name
>> [WITH copy_data];
>> ALTER SUBSCRIPTION subscription_name DROP TABLE [ ONLY ] table_name;
>> ```
>> 4. On `ALTER SUBSCRIPTION <name> REFRESH PUBLICATION` should check if
>> table owner equals subscription owner. The check is ommited if subscription
>> owner is superuser.
>> 5. If superuser calls `ALTER SUBSCRIPTION REFRESH PUBLICATION` on
>> subscription with table list and non-superuser owner, we should filter
>> tables which owner is not subscription's owner or maybe we need to raise
>> error?
>>
>> What do you think about it? Any objections?
>>
>> 07.11.2018, 00:52, "Stephen Frost" <[email protected]>:
>>> Greetings,
>>>
>>> * Evgeniy Efimkin ([email protected]) wrote:
>>>> As a first step I suggest we allow CREATE SUBSCRIPTION for table owner
>>>> only.
>>>
>>> That's a nice idea but seems like we would want to have a way to filter
>>> what tables a subscription follows then..? Just failing if the
>>> publication publishes tables that we don't have access to or are not the
>>> owner of seems like a poor solution..
>>>
>>> Thanks!
>>>
>>> Stephen
>>
>> --------
>> Ефимкин Евгений
>
> --------
> Ефимкин Евгений
--------
Ефимкин Евгений
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index e136aa6a0b..5d7841f296 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->name = pstrdup(NameStr(subform->subname));
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
+ sub->alltables = subform->suballtables;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9021463a4c..4fe643c3bc 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -30,6 +30,7 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
@@ -322,6 +323,14 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
char originname[NAMEDATALEN];
bool create_slot;
List *publications;
+ AclResult aclresult;
+ bool alltables;
+
+ /* must have CREATE privilege on database */
+ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_DATABASE,
+ get_database_name(MyDatabaseId));
/*
* Parse and check options.
@@ -341,11 +350,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
*/
if (create_slot)
PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
-
- if (!superuser())
+ alltables = !stmt->tables;
+ /* FOR ALL TABLES requires superuser */
+ if (alltables && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- (errmsg("must be superuser to create subscriptions"))));
+ (errmsg("must be superuser to create FOR ALL TABLES publication"))));
+
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
@@ -375,6 +386,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
/* Check the connection info string. */
walrcv_check_conninfo(conninfo);
+ walrcv_connstr_check(conninfo);
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
@@ -388,6 +400,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+ values[Anum_pg_subscription_suballtables - 1] = BoolGetDatum(alltables);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (slotname)
@@ -411,6 +424,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
snprintf(originname, sizeof(originname), "pg_%u", subid);
replorigin_create(originname);
+
+ if (stmt->tables&&!connect)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot create subscription with connect = false and FOR TABLE")));
+ }
/*
* Connect to remote side to execute requested commands and fetch table
* info.
@@ -423,6 +443,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
List *tables;
ListCell *lc;
char table_state;
+ List *tablesiods = NIL;
/* Try to connect to the publisher. */
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
@@ -438,6 +459,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
*/
table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ walrcv_security_check(wrconn);
/*
* Get the table list from publisher and build local table status
* info.
@@ -446,17 +468,48 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
foreach(lc, tables)
{
RangeVar *rv = (RangeVar *) lfirst(lc);
- Oid relid;
+ Oid relid;
- relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
- /* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
- rv->schemaname, rv->relname);
-
- AddSubscriptionRelState(subid, relid, table_state,
- InvalidXLogRecPtr);
+ relid = RangeVarGetRelid(rv, NoLock, true);
+ tablesiods = lappend_oid(tablesiods, relid);
}
+ if (stmt->tables)
+ foreach(lc, stmt->tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+ if (!list_member_oid(tablesiods, relid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("table \"%s.%s\" not preset in publication",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid))));
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
+ }
+ else
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+ table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
+ }
/*
* If requested, create permanent slot for the subscription. We
@@ -503,6 +556,92 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
return myself;
}
+static void
+AlterSubscription_drop_table(Subscription *sub, List *tables)
+{
+ ListCell *lc;
+
+
+ Assert(list_length(tables) > 0);
+
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ logicalrep_worker_stop_at_commit(sub->oid, relid);
+ }
+}
+
+static void
+AlterSubscription_add_table(Subscription *sub, List *tables, bool copy_data)
+{
+ char *err;
+ List *pubrel_names;
+ ListCell *lc;
+ List *pubrels = NIL;
+
+
+ Assert(list_length(tables) > 0);
+
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_table_list(wrconn, sub->publications);
+ /* Get oids of rels in command */
+ foreach(lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, NoLock, true);
+ pubrels = lappend_oid(pubrels, relid);
+ }
+
+ /* We are done with the remote side, close connection. */
+ walrcv_disconnect(wrconn);
+
+ foreach(lc, tables)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+ char table_state;
+
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ if (!pg_class_ownercheck(relid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER,
+ get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+ if (!list_member_oid(pubrels, relid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("table \"%s.%s\" not preset in publication",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid))));
+ table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ AddSubscriptionRelState(sub->oid, relid,
+ table_state,
+ InvalidXLogRecPtr);
+ }
+}
+
static void
AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
@@ -724,6 +863,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
/* Check the connection info string. */
+
walrcv_check_conninfo(stmt->conninfo);
values[Anum_pg_subscription_subconninfo - 1] =
@@ -773,6 +913,12 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
+ if (!sub->alltables)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for FOR TABLE ... subscriptions"),
+ errhint("Use ALTER SUBSCRIPTION ADD/DROP TABLE ...")));
+
parse_subscription_options(stmt->options, NULL, NULL, NULL,
NULL, NULL, NULL, ©_data,
@@ -782,7 +928,39 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
break;
}
+ case ALTER_SUBSCRIPTION_ADD_TABLE:
+ {
+ bool copy_data;
+
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not allowed for disabled subscriptions")));
+
+ parse_subscription_options(stmt->options, NULL, NULL, NULL,
+ NULL, NULL, NULL, ©_data,
+ NULL, NULL);
+
+ AlterSubscription_add_table(sub, stmt->tables, copy_data);
+
+ break;
+ }
+ case ALTER_SUBSCRIPTION_DROP_TABLE:
+ {
+
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for disabled subscriptions")));
+ parse_subscription_options(stmt->options, NULL, NULL, NULL,
+ NULL, NULL, NULL, NULL,
+ NULL, NULL);
+
+ AlterSubscription_drop_table(sub, stmt->tables);
+
+ break;
+ }
default:
elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
stmt->kind);
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index db49968409..b929c26adc 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4612,6 +4612,8 @@ _copyCreateSubscriptionStmt(const CreateSubscriptionStmt *from)
COPY_STRING_FIELD(conninfo);
COPY_NODE_FIELD(publication);
COPY_NODE_FIELD(options);
+ COPY_NODE_FIELD(tables);
+ COPY_SCALAR_FIELD(for_all_tables);
return newnode;
}
@@ -4625,6 +4627,7 @@ _copyAlterSubscriptionStmt(const AlterSubscriptionStmt *from)
COPY_STRING_FIELD(subname);
COPY_STRING_FIELD(conninfo);
COPY_NODE_FIELD(publication);
+ COPY_NODE_FIELD(tables);
COPY_NODE_FIELD(options);
return newnode;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3a084b4d1f..1082918ff1 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2238,6 +2238,8 @@ _equalCreateSubscriptionStmt(const CreateSubscriptionStmt *a,
COMPARE_STRING_FIELD(conninfo);
COMPARE_NODE_FIELD(publication);
COMPARE_NODE_FIELD(options);
+ COMPARE_NODE_FIELD(tables);
+ COMPARE_SCALAR_FIELD(for_all_tables);
return true;
}
@@ -2250,6 +2252,7 @@ _equalAlterSubscriptionStmt(const AlterSubscriptionStmt *a,
COMPARE_STRING_FIELD(subname);
COMPARE_STRING_FIELD(conninfo);
COMPARE_NODE_FIELD(publication);
+ COMPARE_NODE_FIELD(tables);
COMPARE_NODE_FIELD(options);
return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 2c2208ffb7..e0198425a0 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -405,6 +405,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause
%type <node> grouping_sets_clause
%type <node> opt_publication_for_tables publication_for_tables
+%type <node> opt_subscription_for_tables subscription_for_tables
%type <value> publication_name_item
%type <list> opt_fdw_options fdw_options
@@ -9565,7 +9566,7 @@ AlterPublicationStmt:
*****************************************************************************/
CreateSubscriptionStmt:
- CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition
+ CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition opt_subscription_for_tables
{
CreateSubscriptionStmt *n =
makeNode(CreateSubscriptionStmt);
@@ -9573,9 +9574,33 @@ CreateSubscriptionStmt:
n->conninfo = $5;
n->publication = $7;
n->options = $8;
+ if ($9 != NULL)
+ {
+ /* FOR TABLE */
+ if (IsA($9, List))
+ n->tables = (List *)$9;
+ /* FOR ALL TABLES */
+ else
+ n->for_all_tables = true;
+ }
$$ = (Node *)n;
}
;
+opt_subscription_for_tables:
+ subscription_for_tables { $$ = $1; }
+ | /* EMPTY */ { $$ = NULL; }
+ ;
+
+subscription_for_tables:
+ FOR TABLE relation_expr_list
+ {
+ $$ = (Node *) $3;
+ }
+ | FOR ALL TABLES
+ {
+ $$ = (Node *) makeInteger(true);
+ }
+ ;
publication_name_list:
publication_name_item
@@ -9655,6 +9680,26 @@ AlterSubscriptionStmt:
(Node *)makeInteger(false), @1));
$$ = (Node *)n;
}
+ | ALTER SUBSCRIPTION name ADD_P TABLE relation_expr_list
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_ADD_TABLE;
+ n->subname = $3;
+ n->tables = $6;
+ n->tableAction = DEFELEM_ADD;
+ $$ = (Node *)n;
+ }
+ | ALTER SUBSCRIPTION name DROP TABLE relation_expr_list
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_DROP_TABLE;
+ n->subname = $3;
+ n->tables = $6;
+ n->tableAction = DEFELEM_DROP;
+ $$ = (Node *)n;
+ }
;
/*****************************************************************************
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e2b54265d7..51dd541c0c 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -52,6 +52,9 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo,
bool logical, const char *appname,
char **err);
static void libpqrcv_check_conninfo(const char *conninfo);
+static void libpqrcv_connstr_check(const char *connstr);
+static void libpqrcv_security_check(WalReceiverConn *conn);
+
static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
char **sender_host, int *sender_port);
@@ -83,6 +86,8 @@ static void libpqrcv_disconnect(WalReceiverConn *conn);
static WalReceiverFunctionsType PQWalReceiverFunctions = {
libpqrcv_connect,
libpqrcv_check_conninfo,
+ libpqrcv_connstr_check,
+ libpqrcv_security_check,
libpqrcv_get_conninfo,
libpqrcv_get_senderinfo,
libpqrcv_identify_system,
@@ -237,6 +242,54 @@ libpqrcv_check_conninfo(const char *conninfo)
PQconninfoFree(opts);
}
+static void
+libpqrcv_security_check(WalReceiverConn *conn)
+{
+ if (!superuser())
+ {
+ if (!PQconnectionUsedPassword(conn->streamConn))
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superuser cannot connect if the server does not request a password."),
+ errhint("Target server's authentication method must be changed.")));
+ }
+}
+
+static void
+libpqrcv_connstr_check(const char *connstr)
+{
+ if (!superuser())
+ {
+ PQconninfoOption *options;
+ PQconninfoOption *option;
+ bool connstr_gives_password = false;
+
+ options = PQconninfoParse(connstr, NULL);
+ if (options)
+ {
+ for (option = options; option->keyword != NULL; option++)
+ {
+ if (strcmp(option->keyword, "password") == 0)
+ {
+ if (option->val != NULL && option->val[0] != '\0')
+ {
+ connstr_gives_password = true;
+ break;
+ }
+ }
+ }
+ PQconninfoFree(options);
+ }
+
+ if (!connstr_gives_password)
+ ereport(ERROR,
+ (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+ errmsg("password is required"),
+ errdetail("Non-superusers must provide a password in the connection string.")));
+ }
+}
+
/*
* Return a user-displayable conninfo string. Any security-sensitive fields
* are obfuscated.
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 4298c3cbf2..3534459bd6 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -47,6 +47,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subenabled; /* True if the subscription is enabled (the
* worker should be running) */
+ bool suballtables;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
@@ -77,6 +78,7 @@ typedef struct Subscription
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
List *publications; /* List of publication names to subscribe to */
+ bool alltables;
} Subscription;
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e5bdc1cec5..982d51f48e 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3475,6 +3475,8 @@ typedef struct CreateSubscriptionStmt
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
+ List *tables; /* Optional list of tables to add */
+ bool for_all_tables; /* Special subscription for all tables in publication */
} CreateSubscriptionStmt;
typedef enum AlterSubscriptionType
@@ -3483,7 +3485,9 @@ typedef enum AlterSubscriptionType
ALTER_SUBSCRIPTION_CONNECTION,
ALTER_SUBSCRIPTION_PUBLICATION,
ALTER_SUBSCRIPTION_REFRESH,
- ALTER_SUBSCRIPTION_ENABLED
+ ALTER_SUBSCRIPTION_ENABLED,
+ ALTER_SUBSCRIPTION_DROP_TABLE,
+ ALTER_SUBSCRIPTION_ADD_TABLE
} AlterSubscriptionType;
typedef struct AlterSubscriptionStmt
@@ -3494,6 +3498,10 @@ typedef struct AlterSubscriptionStmt
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
+ /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */
+ List *tables; /* List of tables to add/drop */
+ bool for_all_tables; /* Special publication for all tables in db */
+ DefElemAction tableAction; /* What action to perform with the tables */
} AlterSubscriptionStmt;
typedef struct DropSubscriptionStmt
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 5913b580c2..fd7c710547 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -204,6 +204,8 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logica
const char *appname,
char **err);
typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
+typedef void (*walrcv_connstr_check_fn) (const char *connstr);
+typedef void (*walrcv_security_check_fn) (WalReceiverConn *conn);
typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
char **sender_host,
@@ -237,6 +239,8 @@ typedef struct WalReceiverFunctionsType
{
walrcv_connect_fn walrcv_connect;
walrcv_check_conninfo_fn walrcv_check_conninfo;
+ walrcv_connstr_check_fn walrcv_connstr_check;
+ walrcv_security_check_fn walrcv_security_check;
walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_get_senderinfo_fn walrcv_get_senderinfo;
walrcv_identify_system_fn walrcv_identify_system;
@@ -256,6 +260,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
#define walrcv_check_conninfo(conninfo) \
WalReceiverFunctions->walrcv_check_conninfo(conninfo)
+#define walrcv_connstr_check(connstr) \
+ WalReceiverFunctions->walrcv_connstr_check(connstr)
+#define walrcv_security_check(conn) \
+ WalReceiverFunctions->walrcv_security_check(conn)
#define walrcv_get_conninfo(conn) \
WalReceiverFunctions->walrcv_get_conninfo(conn)
#define walrcv_get_senderinfo(conn, sender_host, sender_port) \