On Tue, Mar 15, 2022 at 7:09 AM kuroda.hay...@fujitsu.com <kuroda.hay...@fujitsu.com> wrote: > > Dear Vignesh, > > Thank you for updating your patch! > > > Let's consider an existing Multi master logical replication setup > > between Node1 and Node2 that is created using the following steps: > > a) Node1 - Publication publishing employee table - pub1 > > b) Node2 - Subscription subscribing from publication pub1 with > > publish_local_only - sub1_pub1_node1 > > c) Node2 - Publication publishing employee table - pub2 > > d) Node1 - Subscription subscribing from publication pub2 with > > publish_local_only - sub2_pub2_node2 > > > > To create a subscription in node3, we will be using the following steps: > > a) Node2 - Publication publishing employee table. - pub3 > > b) Node3 - Subscription subscribing from publication in Node2 with > > publish_local_only - sub3_pub3_node2 > > > > When we create a subscription in Node3, Node3 will connect to > > Node2(this will not be done in Node3) and check if the employee table > > is present in pg_subscription_rel, in our case Node2 will have > > employee table present in pg_subscription_rel (sub1_pub1_node1 > > subscribing to employee table from pub1 in Node1). As employee table > > is being subscribed in node2 from node1, we will throw an error like > > below: > > postgres=# create subscription sub2 CONNECTION 'dbname =postgres port > > = 9999' publication pub2 with (publish_local_only=on); > > ERROR: CREATE/ALTER SUBSCRIPTION with publish_local_only and > > copy_data as true is not allowed when the publisher might have > > replicated data, table:public.t1 might have replicated data in the > > publisher > > HINT: Use CREATE/ALTER SUBSCRIPTION with copy_data = off or force > > Thanks for kind explanation. > I read above and your doc in 0002, and I put some comments. > > 1. alter_subscription.sgml > > ``` > - <term><literal>copy_data</literal> (<type>boolean</type>)</term> > + <term><literal>copy_data</literal> (<type>boolean</type> | > <literal>force</literal>)</term> > ``` > > I thought that it should be written as enum. For example, huge_pages GUC > parameter > can accept {on, off, try}, and it has been written as enum.
Modified > 2. create_subscription.sgml > > ``` > - <term><literal>copy_data</literal> (<type>boolean</type>)</term> > + <term><literal>copy_data</literal> (<type>boolean</type> | > <literal>force</literal>)</term> > ``` > > Same as above. Modified > 3. create_subscription.sgml > > ``` > + > + <para> > + If the publication tables were also subscribing data in the > publisher > + from other publishers, it will affect the > + <command>CREATE SUBSCRIPTION</command> based on the value specified > + for <literal>publish_local_only</literal> option. Refer to the > + <xref linkend="sql-createsubscription-notes" /> for details. > + </para> > ``` > > I seeked docs, but the words " publication tables " have not seen. > How about "tables in the publication"? Modified > 4. create_subscription.sgml - about your example > > In the first section, we should describe about 2-nodes case more detail > like Amit mentioned in [1]. I thought that Option-3 can be resolved by > defining > subscriptions in both nodes with publish_local_only = true and copy_data = > force. I thought existing information is enough because we have mentioned that node1 and node2 have bidirectional replication setup done and both the table data will be replicated and synchronized as and when the DML operations are happening. In option-3 we need to create a subscription with copy_data as force to one node and copy_data as false to another node because both nodes will be having the same data, copying the data just from one of the nodes should be enough. Thanks for the comments, the attached v5 patch has the changes for the same. Regards, Vignesh
From 2e9b9192a1f485d7b1f3c53170626cb49a31ee1e Mon Sep 17 00:00:00 2001 From: Vigneshwaran C <vignes...@gmail.com> Date: Tue, 15 Mar 2022 16:47:30 +0530 Subject: [PATCH v5 1/2] Skip replication of non local data. Add an option publish_local_only which will subscribe only to the locally generated data in the publisher node. If subscriber is created with this option, publisher will skip publishing the data that was subscribed from other nodes. It can be created using following syntax: ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=9999' PUBLICATION pub1 with (publish_local_only = on); --- contrib/test_decoding/test_decoding.c | 20 +++ doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 12 ++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 4 +- src/backend/commands/subscriptioncmds.c | 26 +++- .../libpqwalreceiver/libpqwalreceiver.c | 4 + src/backend/replication/logical/decode.c | 15 +- src/backend/replication/logical/logical.c | 33 +++++ src/backend/replication/logical/worker.c | 2 + src/backend/replication/pgoutput/pgoutput.c | 45 ++++++ src/bin/pg_dump/pg_dump.c | 13 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 4 +- src/include/catalog/pg_subscription.h | 3 + src/include/replication/logical.h | 4 + src/include/replication/output_plugin.h | 7 + src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + src/test/regress/expected/subscription.out | 131 ++++++++++-------- src/test/regress/sql/subscription.sql | 7 + src/test/subscription/t/030_circular.pl | 108 +++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 24 files changed, 384 insertions(+), 72 deletions(-) create mode 100644 src/test/subscription/t/030_circular.pl diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index ea22649e41..13c40ca167 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -73,6 +73,8 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pg_decode_filter_remote_origin(LogicalDecodingContext *ctx, + RepOriginId origin_id); static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, @@ -148,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; + cb->filter_remote_origin_cb = pg_decode_filter_remote_origin; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; cb->sequence_cb = pg_decode_sequence; @@ -484,6 +487,23 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +/* + * Filter out the transactions that had originated remotely. + * + * Return true if only_local option was specified and if the transaction has a + * valid originid. + */ +static bool +pg_decode_filter_remote_origin(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->only_local && origin_id != InvalidRepOriginId) + return true; + return false; +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 58b78a94ea..97f09ce2b5 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -204,8 +204,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < information. The parameters that can be altered are <literal>slot_name</literal>, <literal>synchronous_commit</literal>, - <literal>binary</literal>, <literal>streaming</literal>, and - <literal>disable_on_error</literal>. + <literal>binary</literal>, <literal>streaming</literal>, + <literal>disable_on_error</literal> and + <literal>publish_local_only</literal>. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index b701752fc9..918d3a7e82 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -152,6 +152,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </listitem> </varlistentry> + <varlistentry> + <term><literal>publish_local_only</literal> (<type>boolean</type>)</term> + <listitem> + <para> + Specifies whether the subscription should subscribe only to the + locally generated changes or subscribe to both the locally generated + changes and the replicated changes that was generated from other + nodes in the publisher. The default is <literal>false</literal>. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><literal>slot_name</literal> (<type>string</type>)</term> <listitem> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a6304f5f81..5afb14e3a6 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->only_local = subform->sublocal; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index bb1ac30cd1..c042b03abb 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1261,8 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, - substream, subtwophasestate, subdisableonerr, subslotname, - subsynccommit, subpublications) + substream, subtwophasestate, subdisableonerr, sublocal, + subslotname, subsynccommit, subpublications) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3922658bbc..05f0b11bd3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -62,6 +62,7 @@ #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 +#define SUBOPT_PUBLISH_LOCAL_ONLY 0x00000800 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -84,6 +85,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + bool only_local; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -134,6 +136,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->twophase = false; if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) opts->disableonerr = false; + if (IsSet(supported_opts, SUBOPT_PUBLISH_LOCAL_ONLY)) + opts->only_local = false; /* Parse options */ foreach(lc, stmt_options) @@ -232,6 +236,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_STREAMING; opts->streaming = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_PUBLISH_LOCAL_ONLY) && + strcmp(defel->defname, "publish_local_only") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_PUBLISH_LOCAL_ONLY)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_PUBLISH_LOCAL_ONLY; + opts->only_local = defGetBoolean(defel); + } else if (strcmp(defel->defname, "two_phase") == 0) { /* @@ -404,7 +417,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR); + SUBOPT_DISABLE_ON_ERR | SUBOPT_PUBLISH_LOCAL_ONLY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -474,6 +487,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); + values[Anum_pg_subscription_sublocal - 1] = BoolGetDatum(opts.only_local); values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : @@ -879,7 +893,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR); + SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_PUBLISH_LOCAL_ONLY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -936,6 +951,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_PUBLISH_LOCAL_ONLY)) + { + values[Anum_pg_subscription_sublocal - 1] = + BoolGetDatum(opts.streaming); + replaces[Anum_pg_subscription_sublocal - 1] = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0d89db4e6a..3e057ef47b 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -453,6 +453,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 150000) appendStringInfoString(&cmd, ", two_phase 'on'"); + if (options->proto.logical.only_local && + PQserverVersion(conn->streamConn) >= 150000) + appendStringInfoString(&cmd, ", publish_local_only 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8c00a73cb9..0ae57f2161 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -546,13 +546,22 @@ FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid, return filter_prepare_cb_wrapper(ctx, xid, gid); } +/* + * Ask output plugin whether we want to skip the transaction having this + * origin_id or if the transaction has originated from a different node. + */ static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { - if (ctx->callbacks.filter_by_origin_cb == NULL) - return false; + bool result = false; + + if (ctx->callbacks.filter_by_origin_cb != NULL) + result = filter_by_origin_cb_wrapper(ctx, origin_id); + + if (ctx->callbacks.filter_remote_origin_cb != NULL) + result |= filter_remote_origin_cb_wrapper(ctx, origin_id); - return filter_by_origin_cb_wrapper(ctx, origin_id); + return result; } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 934aa13f2d..8a3c276be9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -246,6 +246,8 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_sequence_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); + ctx->only_local = ctx->callbacks.filter_remote_origin_cb != NULL; + /* * streaming callbacks * @@ -1178,6 +1180,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) return ret; } +bool +filter_remote_origin_cb_wrapper(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + Assert(!ctx->fast_forward); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_remote_origin"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_remote_origin_cb(ctx, origin_id); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + return ret; +} + static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 03e069c7cd..3f8a171f44 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2964,6 +2964,7 @@ maybe_reread_subscription(void) newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || newsub->owner != MySubscription->owner || + newsub->only_local != MySubscription->only_local || !equal(newsub->publications, MySubscription->publications)) { ereport(LOG, @@ -3644,6 +3645,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; options.proto.logical.twophase = false; + options.proto.logical.only_local = MySubscription->only_local; if (!am_tablesync_worker()) { diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index ea57a0477f..b31e0c3ebb 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -55,6 +55,8 @@ static void pgoutput_message(LogicalDecodingContext *ctx, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pgoutput_remote_origin_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, @@ -215,6 +217,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_prepared_cb = pgoutput_commit_prepared_txn; cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; + cb->filter_remote_origin_cb = pgoutput_remote_origin_filter; cb->shutdown_cb = pgoutput_shutdown; /* transaction streaming */ @@ -239,11 +242,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool messages_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; + bool publish_local_only_option_given = false; data->binary = false; data->streaming = false; data->messages = false; data->two_phase = false; + data->only_local = false; foreach(lc, options) { @@ -332,6 +337,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "publish_local_only") == 0) + { + if (publish_local_only_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + publish_local_only_option_given = true; + + data->only_local = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -430,6 +445,18 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, else ctx->twophase_opt_given = true; + if (!data->only_local) + ctx->only_local = false; + else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support publish_local_only, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM))); + else if (!ctx->only_local) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("publish_local_only requested, but not supported by output plugin"))); + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -448,6 +475,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, */ ctx->streaming = false; ctx->twophase = false; + ctx->only_local = false; } } @@ -1450,6 +1478,23 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, return false; } +/* + * Filter out the transactions that had originated remotely. + * + * Return true if only_local option was specified and if the transaction has a + * valid originid. + */ +static bool +pgoutput_remote_origin_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + + if (data->only_local && origin_id != InvalidRepOriginId) + return true; + return false; +} + /* * Shutdown the output plugin. * diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 4dd24b8c89..ae50620dbc 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4299,6 +4299,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_subbinary; + int i_sublocal; int i, ntups; @@ -4343,11 +4344,13 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 150000) appendPQExpBufferStr(query, " s.subtwophasestate,\n" - " s.subdisableonerr\n"); + " s.subdisableonerr,\n" + " s.sublocal\n"); else appendPQExpBuffer(query, " '%c' AS subtwophasestate,\n" - " false AS subdisableonerr\n", + " false AS subdisableonerr,\n" + " false AS s.sublocal\n", LOGICALREP_TWOPHASE_STATE_DISABLED); appendPQExpBufferStr(query, @@ -4371,6 +4374,7 @@ getSubscriptions(Archive *fout) i_substream = PQfnumber(res, "substream"); i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); + i_sublocal = PQfnumber(res, "sublocal"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4400,6 +4404,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); + subinfo[i].sublocal = + pg_strdup(PQgetvalue(res, i, i_sublocal)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4473,6 +4479,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subdisableonerr, "t") == 0) appendPQExpBufferStr(query, ", disable_on_error = true"); + if (strcmp(subinfo->sublocal, "f") != 0) + appendPQExpBufferStr(query, ", publish_local_only = on"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 772dc0cf7a..1198ed5aee 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -660,6 +660,7 @@ typedef struct _SubscriptionInfo char *subdisableonerr; char *subsynccommit; char *subpublications; + char *sublocal; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 9229eacb6d..f98a5d729d 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6084,7 +6084,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false}; + false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6122,9 +6122,11 @@ describeSubscriptions(const char *pattern, bool verbose) if (pset.sversion >= 150000) appendPQExpBuffer(&buf, ", subtwophasestate AS \"%s\"\n" - ", subdisableonerr AS \"%s\"\n", + ", subdisableonerr AS \"%s\"\n" + ", sublocal AS \"%s\"\n", gettext_noop("Two phase commit"), - gettext_noop("Disable on error")); + gettext_noop("Disable on error"), + gettext_noop("Only local")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 17172827a9..e748630da6 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1834,7 +1834,7 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION <name> SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error"); + COMPLETE_WITH("binary", "publish_local_only", "slot_name", "streaming", "synchronous_commit", "disable_on_error"); /* ALTER SUBSCRIPTION <name> SET PUBLICATION */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION")) { @@ -3103,7 +3103,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "enabled", "slot_name", "streaming", + "enabled", "publish_local_only", "slot_name", "streaming", "synchronous_commit", "two_phase", "disable_on_error"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index e2befaf351..80b3f84123 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -65,6 +65,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + bool sublocal; /* skip copying of remote origin data */ + char subtwophasestate; /* Stream two-phase transactions */ bool subdisableonerr; /* True if a worker error should cause the @@ -105,6 +107,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + bool only_local; /* Skip copying of remote orgin data */ char twophasestate; /* Allow streaming two-phase transactions */ bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 1097cc9799..e3858d3b24 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext */ bool twophase_opt_given; + bool only_local; /* publish only locally generated data */ + /* * State for writing output. */ @@ -138,6 +140,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); +extern bool filter_remote_origin_cb_wrapper(LogicalDecodingContext *ctx, + RepOriginId origin_id); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index a16bebf76c..0883031176 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -106,6 +106,12 @@ typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id); +/* + * Filter remote origin changes. + */ +typedef bool (*LogicalDecodeFilterRemoteOriginCB) (struct LogicalDecodingContext *ctx, + RepOriginId origin_id); + /* * Called to shutdown an output plugin. */ @@ -246,6 +252,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; + LogicalDecodeFilterRemoteOriginCB filter_remote_origin_cb; LogicalDecodeShutdownCB shutdown_cb; /* streaming of changes at prepare time */ diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index eafedd610a..0461f4e634 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -29,6 +29,7 @@ typedef struct PGOutputData bool streaming; bool messages; bool two_phase; + bool only_local; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 92f73a55b8..65c83977a3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -183,6 +183,7 @@ typedef struct bool streaming; /* Streaming of large transactions */ bool twophase; /* Streaming of two-phase transactions at * prepare time */ + bool only_local; /* publish only locally generated data */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index ad8003fae1..bf8a3dd6f6 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -70,16 +70,35 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE; ERROR: cannot enable subscription that does not have a slot name ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions +-- ok - with publish_local_only = true +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, publish_local_only = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | t | off | dbname=regress_doesnotexist +(1 row) + +ALTER SUBSCRIPTION regress_testsub4 SET (publish_local_only = false); +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist +(1 row) + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -94,10 +113,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | f | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -129,10 +148,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | f | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -165,19 +184,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -188,19 +207,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist (1 row) -- fail - publication already exists @@ -215,10 +234,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | f | off | dbname=regress_doesnotexist (1 row) -- fail - publication used more then once @@ -233,10 +252,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -270,10 +289,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | f | off | dbname=regress_doesnotexist (1 row) --fail - alter of two_phase option not supported. @@ -282,10 +301,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -294,10 +313,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -309,18 +328,18 @@ ERROR: disable_on_error requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index a7c15b1daf..a8470ea822 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -54,7 +54,14 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU ALTER SUBSCRIPTION regress_testsub3 ENABLE; ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; +-- ok - with publish_local_only = true +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, publish_local_only = true); +\dRs+ regress_testsub4 +ALTER SUBSCRIPTION regress_testsub4 SET (publish_local_only = false); +\dRs+ regress_testsub4 + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; diff --git a/src/test/subscription/t/030_circular.pl b/src/test/subscription/t/030_circular.pl new file mode 100644 index 0000000000..e4b9e58c39 --- /dev/null +++ b/src/test/subscription/t/030_circular.pl @@ -0,0 +1,108 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Test circular logical replication. +# +# Includes tests for circulation replication using publish_local_only option. +# +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +################################################### +# Setup a circulation replication of pub/sub nodes. +# node_A -> node_B -> node_A +################################################### + +# Initialize nodes +# node_A +my $node_A = PostgreSQL::Test::Cluster->new('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->append_conf('postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_A->start; +# node_B +my $node_B = PostgreSQL::Test::Cluster->new('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->append_conf('postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_B->start; + +# Create tables on node_A +$node_A->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Create the same tables on node_B +$node_B->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full"); +my $appname_B = 'tap_sub_B'; +$node_B->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_B + CONNECTION '$node_A_connstr application_name=$appname_B' + PUBLICATION tap_pub_A + WITH (publish_local_only = on)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full"); +my $appname_A = 'tap_sub_A'; +$node_A->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_A + CONNECTION '$node_B_connstr application_name=$appname_A' + PUBLICATION tap_pub_B + WITH (publish_local_only = on, copy_data = off)"); + +# Wait for subscribers to finish initialization +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_A); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_A->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_B->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +is(1,1, "Circular replication setup is complete"); + +my $result; + +########################################################################## +# check that circular replication setup does not cause infinite recursive +# insertion. +########################################################################## + +# insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_A); + +# check that transaction was committed on subscriber(s) +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;"); +is($result, qq(11 +12), 'Inserted successfully without leading to infinite recursion in circular replication setup'); +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;"); +is($result, qq(11 +12), 'Inserted successfully without leading to infinite recursion in circular replication setup'); + +# shutdown +$node_B->stop('fast'); +$node_A->stop('fast'); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index eaf3e7a8d4..321e083d43 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1370,6 +1370,7 @@ LogicalDecodeCommitCB LogicalDecodeCommitPreparedCB LogicalDecodeFilterByOriginCB LogicalDecodeFilterPrepareCB +LogicalDecodeFilterRemoteOriginCB LogicalDecodeMessageCB LogicalDecodePrepareCB LogicalDecodeRollbackPreparedCB -- 2.32.0
From b1e65abfaa33616dc41a023504666b96130fd5bb Mon Sep 17 00:00:00 2001 From: Vigneshwaran C <vignes...@gmail.com> Date: Wed, 16 Mar 2022 10:56:39 +0530 Subject: [PATCH v5 2/2] Support force option for copy_data, check and throw an error if publisher tables were also subscribing data in the publisher from other publishers. This patch does couple of things: 1) Added force option for copy_data. 2) Check and throw an error if the publication tables were also subscribing data in the publisher from other publishers. Let's consider an existing Multi master logical replication setup between Node1 and Node2 that is created using the following steps: a) Node1 - Publication publishing employee table. b) Node2 - Subscription subscribing from publication pub1 with publish_local_only. If a subscription is created to Node1 from Node3 with publish_local_only and copy_data as ON, then throw an error so that user can handle creation of subscription with table having consistent data. --- doc/src/sgml/ref/alter_subscription.sgml | 8 +- doc/src/sgml/ref/create_subscription.sgml | 59 +++++++- src/backend/commands/subscriptioncmds.c | 150 +++++++++++++++++---- src/test/regress/expected/subscription.out | 4 +- src/test/regress/sql/subscription.sql | 1 + src/test/subscription/t/030_circular.pl | 24 +++- src/tools/pgindent/typedefs.list | 1 + 7 files changed, 212 insertions(+), 35 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 97f09ce2b5..49fffb5f18 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -158,12 +158,14 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <variablelist> <varlistentry> - <term><literal>copy_data</literal> (<type>boolean</type>)</term> + <term><literal>copy_data</literal> (<type>enum</type>)</term> <listitem> <para> Specifies whether to copy pre-existing data in the publications - that are being subscribed to when the replication starts. - The default is <literal>true</literal>. + that are being subscribed to when the replication starts. This + parameter may be either <literal>true</literal>, + <literal>false</literal> or <literal>force</literal>. The default is + <literal>true</literal>. </para> <para> Previously subscribed tables are not copied, even if a table's row diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 918d3a7e82..73e7ee74d6 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -161,6 +161,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl changes and the replicated changes that was generated from other nodes in the publisher. The default is <literal>false</literal>. </para> + <para> + If the tables in the publication were also subscribing to the data in + the publisher from other publishers, it will affect the + <command>CREATE SUBSCRIPTION</command> based on the value specified + for <literal>copy_data</literal> option. Refer to the + <xref linkend="sql-createsubscription-notes" /> for details. + </para> </listitem> </varlistentry> @@ -213,18 +220,28 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </varlistentry> <varlistentry> - <term><literal>copy_data</literal> (<type>boolean</type>)</term> + <term><literal>copy_data</literal> (<type>enum</type>)</term> <listitem> <para> Specifies whether to copy pre-existing data in the publications - that are being subscribed to when the replication starts. - The default is <literal>true</literal>. + that are being subscribed to when the replication starts. This + parameter may be either <literal>true</literal>, + <literal>false</literal> or <literal>force</literal>. The default is + <literal>true</literal>. </para> <para> If the publications contain <literal>WHERE</literal> clauses, it will affect what data is copied. Refer to the <xref linkend="sql-createsubscription-notes" /> for details. </para> + + <para> + If the tables in the publication were also subscribing to the data in + the publisher from other publishers, it will affect the + <command>CREATE SUBSCRIPTION</command> based on the value specified + for <literal>publish_local_only</literal> option. Refer to the + <xref linkend="sql-createsubscription-notes" /> for details. + </para> </listitem> </varlistentry> @@ -368,6 +385,42 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl copied data that would be incompatible with subsequent filtering. </para> + <para> + Let's consider an existing Multi master logical replication setup between + Node1 and Node2 that is created using the following steps: + a) Node1 - Publication publishing employee table. + b) Node2 - Subscription subscribing from publication pub1 with + <literal>publish_local_only</literal>. + c) Node2 - Publication publishing employee table. + d) Node1 - Subscription subscribing from publication pub2 with + <literal>publish_local_only</literal>. + Now when user is trying to add another node Node3 to the above Multi master + logical replication setup, user will have to create one subscription + subscribing from Node1 and another subscription subscribing from Node2 in + Node3 using <literal>publish_local_only</literal> option and + <literal>copy_data</literal> as <literal>true</literal>, while + the subscription is created, server will identify that Node2 is subscribing + from Node1 and Node1 is subscribing from Node2 and throw an error like: + <programlisting> +postgres=# CREATE SUBSCRIPTION mysub CONNECTION 'host=192.168.1.50 port=5432 user=foo dbname=foodb' + PUBLICATION mypublication, insert_only with (publish_local_only=on); +ERROR: CREATE/ALTER SUBSCRIPTION with publish_local_only and copy_data as true is not allowed when the publisher might have replicated data, table:public.t1 might have replicated data in the publisher +HINT: Use CREATE/ALTER SUBSCRIPTION with copy_data = off or force + </programlisting> + In this scenario user can solve this based on one of the 2 possibilities, + a) If there are no data present in Node1 and Node2, then the user can create + the subscriptions to Node1 and Node2 with + <literal>publish_local_only</literal> as <literal>true</literal> and + <literal>copy_data</literal> as <literal>false</literal>. b) If the data is + present, then the user can create subscription with + <literal>copy_data</literal> as <literal>force</literal> on Node1 and + <literal>copy_data</literal> as <literal>false</literal> on Node2, before + allowing any operations on the respective tables of Node1 and Node2, in this + case <literal>copy_data</literal> is <literal>false</literal> on Node2 + because the data will be replicated to each other and available on both the + nodes. + </para> + </refsect1> <refsect1> diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 05f0b11bd3..b3823fb3ff 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -67,6 +67,18 @@ /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) +#define IS_COPY_DATA_VALID(copy_data) (copy_data != COPY_DATA_OFF) + +/* + * Represents whether copy_data option is specified with on, off or force. + */ +typedef enum CopyData +{ + COPY_DATA_OFF, + COPY_DATA_ON, + COPY_DATA_FORCE +} CopyData; + /* * Structure to hold a bitmap representing the user-provided CREATE/ALTER * SUBSCRIPTION command options and the parsed/default values of each of them. @@ -79,7 +91,7 @@ typedef struct SubOpts bool connect; bool enabled; bool create_slot; - bool copy_data; + CopyData copy_data; bool refresh; bool binary; bool streaming; @@ -88,11 +100,69 @@ typedef struct SubOpts bool only_local; } SubOpts; -static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_table_list(WalReceiverConn *wrconn, List *publications, + CopyData copydata, bool only_local); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); +/* + * Validate the value specified for copy_data option. + */ +static CopyData +DefGetCopyData(DefElem *def) +{ + /* + * If no parameter given, assume "true" is meant. + */ + if (def->arg == NULL) + return COPY_DATA_ON; + + /* + * Allow 0, 1, "true", "false", "on", "off" or "force". + */ + switch (nodeTag(def->arg)) + { + case T_Integer: + switch (intVal(def->arg)) + { + case 0: + return COPY_DATA_OFF; + case 1: + return COPY_DATA_ON; + default: + /* otherwise, error out below */ + break; + } + break; + default: + { + char *sval = defGetString(def); + + /* + * The set of strings accepted here should match up with + * the grammar's opt_boolean_or_string production. + */ + if (pg_strcasecmp(sval, "true") == 0) + return COPY_DATA_ON; + if (pg_strcasecmp(sval, "false") == 0) + return COPY_DATA_OFF; + if (pg_strcasecmp(sval, "on") == 0) + return COPY_DATA_ON; + if (pg_strcasecmp(sval, "off") == 0) + return COPY_DATA_OFF; + if (pg_strcasecmp(sval, "force") == 0) + return COPY_DATA_FORCE; + + } + break; + } + + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s requires a boolean or \"force\"", def->defname)); + return COPY_DATA_OFF; /* keep compiler quiet */ +} /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -125,7 +195,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (IsSet(supported_opts, SUBOPT_CREATE_SLOT)) opts->create_slot = true; if (IsSet(supported_opts, SUBOPT_COPY_DATA)) - opts->copy_data = true; + opts->copy_data = COPY_DATA_ON; if (IsSet(supported_opts, SUBOPT_REFRESH)) opts->refresh = true; if (IsSet(supported_opts, SUBOPT_BINARY)) @@ -193,7 +263,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_COPY_DATA; - opts->copy_data = defGetBoolean(defel); + opts->copy_data = DefGetCopyData(defel); } else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && strcmp(defel->defname, "synchronous_commit") == 0) @@ -303,17 +373,17 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("%s and %s are mutually exclusive options", "connect = false", "create_slot = true"))); - if (opts->copy_data && + if (IS_COPY_DATA_VALID(opts->copy_data) && IsSet(opts->specified_opts, SUBOPT_COPY_DATA)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", - "connect = false", "copy_data = true"))); + "connect = false", "copy_data = true/force"))); /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; - opts->copy_data = false; + opts->copy_data = COPY_DATA_OFF; } /* @@ -541,13 +611,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + table_state = IS_COPY_DATA_VALID(opts.copy_data) ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* * Get the table list from publisher and build local table status * info. */ - tables = fetch_table_list(wrconn, publications); + tables = fetch_table_list(wrconn, publications, opts.copy_data, + opts.only_local); foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); @@ -590,7 +661,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && tables != NIL) + if (opts.twophase && IS_COPY_DATA_VALID(opts.copy_data) && + tables != NIL) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -629,7 +701,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data) +AlterSubscription_refresh(Subscription *sub, CopyData copy_data, + bool only_local) { char *err; List *pubrel_names; @@ -661,7 +734,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) PG_TRY(); { /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + pubrel_names = fetch_table_list(wrconn, sub->publications, copy_data, + only_local); /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid); @@ -715,7 +789,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) list_length(subrel_states), sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, - copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + IS_COPY_DATA_VALID(copy_data) ? SUBREL_STATE_INIT : SUBREL_STATE_READY, InvalidXLogRecPtr); ereport(DEBUG1, (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", @@ -998,7 +1072,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, case ALTER_SUBSCRIPTION_SET_PUBLICATION: { - supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; + supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH | SUBOPT_PUBLISH_LOCAL_ONLY; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1021,7 +1095,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * See ALTER_SUBSCRIPTION_REFRESH for details why this is * not allowed. */ - if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && IS_COPY_DATA_VALID(opts.copy_data)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), @@ -1033,7 +1107,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, opts.only_local); } break; @@ -1045,7 +1119,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, List *publist; bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; - supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA; + supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA | SUBOPT_PUBLISH_LOCAL_ONLY; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1081,7 +1155,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Refresh the new list of publications. */ sub->publications = publist; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, opts.only_local); } break; @@ -1095,7 +1169,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, - SUBOPT_COPY_DATA, &opts); + SUBOPT_COPY_DATA | SUBOPT_PUBLISH_LOCAL_ONLY, + &opts); /* * The subscription option "two_phase" requires that @@ -1114,7 +1189,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * * For more details see comments atop worker.c. */ - if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && + IS_COPY_DATA_VALID(opts.copy_data)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), @@ -1123,7 +1199,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, opts.only_local); break; } @@ -1596,12 +1672,13 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * publisher connection. */ static List * -fetch_table_list(WalReceiverConn *wrconn, List *publications) +fetch_table_list(WalReceiverConn *wrconn, List *publications, CopyData copydata, + bool only_local) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + Oid tableRow[3] = {TEXTOID, TEXTOID, CHAROID}; ListCell *lc; bool first; List *tablelist = NIL; @@ -1609,9 +1686,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) Assert(list_length(publications) > 0); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); + appendStringInfoString(&cmd, + "SELECT DISTINCT N.nspname AS schemaname, C.relname AS tablename, PS.srsubstate as replicated\n" + "FROM pg_publication P,\n" + "LATERAL pg_get_publication_tables(P.pubname) GPT\n" + "LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n" + "pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPT.relid AND P.pubname in ("); first = true; foreach(lc, publications) { @@ -1626,7 +1707,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) } appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1649,6 +1730,21 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + /* + * XXX: It is quite possible that subscriber has not yet pulled data to + * the tables, but in ideal cases the table data will be subscribed. + * Too keep the code simple it is not checked if the subscriber table + * has pulled the data or not. Throw an error so that the user can + * take care of the initial data copying and then create subscription + * with copy_data off. + */ + if (copydata == COPY_DATA_ON && only_local && !slot_attisnull(slot, 3)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("CREATE/ALTER SUBSCRIPTION with publish_local_only and copy_data as true is not allowed when the publisher might have replicated data, table:%s.%s might have replicated data in the publisher", + nspname, relname), + errhint("Use CREATE/ALTER SUBSCRIPTION with copy_data = off or force")); + rv = makeRangeVar(nspname, relname, -1); tablelist = lappend(tablelist, rv); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index bf8a3dd6f6..5c93df617f 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -47,7 +47,9 @@ ERROR: must be superuser to create subscriptions SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - invalid option combinations CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); -ERROR: connect = false and copy_data = true are mutually exclusive options +ERROR: connect = false and copy_data = true/force are mutually exclusive options +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force); +ERROR: connect = false and copy_data = true/force are mutually exclusive options CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true); ERROR: connect = false and enabled = true are mutually exclusive options CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index a8470ea822..f4b7ed634d 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -40,6 +40,7 @@ SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - invalid option combinations CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); +CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true); CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, enabled = true); diff --git a/src/test/subscription/t/030_circular.pl b/src/test/subscription/t/030_circular.pl index e4b9e58c39..7bce6acf23 100644 --- a/src/test/subscription/t/030_circular.pl +++ b/src/test/subscription/t/030_circular.pl @@ -42,6 +42,10 @@ $node_A->safe_psql('postgres', $node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); +my $result; +my $stdout; +my $stderr; + # Setup logical replication # node_A (pub) -> node_B (sub) my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; @@ -65,6 +69,25 @@ $node_A->safe_psql('postgres', " PUBLICATION tap_pub_B WITH (publish_local_only = on, copy_data = off)"); +($result, $stdout, $stderr) = $node_A->psql('postgres', " + CREATE SUBSCRIPTION tap_sub_A1 + CONNECTION '$node_B_connstr application_name=$appname_A' + PUBLICATION tap_pub_B + WITH (publish_local_only = on, copy_data = on)"); +like( + $stderr, + qr/ERROR: CREATE\/ALTER SUBSCRIPTION with publish_local_only and copy_data as true is not allowed when the publisher might have replicated data/, + "Create subscription with publish_local_only and copy_data having replicated table in publisher"); + +$node_A->safe_psql('postgres', " + CREATE SUBSCRIPTION tap_sub_A2 + CONNECTION '$node_B_connstr application_name=$appname_A' + PUBLICATION tap_pub_B + WITH (publish_local_only = on, copy_data = force)"); + +$node_A->safe_psql('postgres', " + DROP SUBSCRIPTION tap_sub_A2"); + # Wait for subscribers to finish initialization $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_A); @@ -79,7 +102,6 @@ $node_B->poll_query_until('postgres', $synced_query) is(1,1, "Circular replication setup is complete"); -my $result; ########################################################################## # check that circular replication setup does not cause infinite recursive diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 321e083d43..ffae65fdde 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -441,6 +441,7 @@ ConvProcInfo ConversionLocation ConvertRowtypeExpr CookedConstraint +CopyData CopyDest CopyFormatOptions CopyFromState -- 2.32.0