On 3/6/17 05:27, Petr Jelinek wrote: > updated and rebased version of the patch attached.
Some comments on this patch: Can we have a better explanation of different snapshot options in CREATE_REPLICATION_SLOT. We use all these variants in different places, but it's not fully documented why. Think about interested users reading this. errmsg("subscription table %u in subscription %u does not exist", Use names instead of IDs if possible. + libpqrcv_table_list, + libpqrcv_table_info, + libpqrcv_table_copy, I don't think these functions belong into the WAL receiver API, since they are specific to this particular logical replication implementation. I would just make an API function libpqrc_exec_sql() that runs a query, and then put the table_*() functions as wrappers around that into tablesync.c. Not sure what the worker init stuff in ApplyLauncherShmemInit() is doing. Could be commented more. There are a lot of places that do one of MyLogicalRepWorker->relid == InvalidOid OidIsValid(MyLogicalRepWorker->relid) To check whether the current worker is a sync worker. Wrap that into a function. Not a fan of adding CommandCounterIncrement() calls at the end of functions. In some cases, they are not necessary at all. In cases where they are, the CCI call should be at a higher level between two function calls with a comment for why the call below needs to see the changes from the call above. The index name pg_subscription_rel_map_index/SubscriptionRelMapIndexId doesn't seem to match existing style, since there is no "map" column. How about pg_subscription_rel_rel_sub_index? I see we have a similarly named index for publications. max_sync_workers_per_subscription could be PGC_SIGHUP, I think. And the minimum could be 0, to stop any syncing? pg_subscription_rel.h: I'm not sure under which circumstances we need to use BKI_ROWTYPE_OID. Does pg_subscription_rel need an OID column? The index SubscriptionRelOidIndexId is not used anywhere. You removed this command from the tests: ALTER SUBSCRIPTION testsub SET PUBLICATION testpub, testpub1; I suppose because it causes a connection. We should have an option to prevent that (NOCONNECT/NOREFRESH?). Why was the test 'check replication origin was dropped on subscriber' removed? Attached also a small patch with some stylistic changes. -- Peter Eisentraut http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From c6ba1eaa3c5a44e4a9f6d072cb95fcf7e68ba3d6 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut <peter_e@gmx.net> Date: Thu, 9 Mar 2017 08:19:06 -0500 Subject: [PATCH] fixup! Logical replication support for initial data copy --- doc/src/sgml/catalogs.sgml | 9 +++++---- doc/src/sgml/logical-replication.sgml | 18 +++++++++--------- doc/src/sgml/monitoring.sgml | 4 ++-- doc/src/sgml/protocol.sgml | 14 +++++++------- doc/src/sgml/ref/alter_subscription.sgml | 8 ++++---- doc/src/sgml/ref/create_subscription.sgml | 13 ++++++------- src/backend/catalog/pg_subscription.c | 13 ++++--------- src/backend/replication/logical/launcher.c | 18 +++++++----------- src/backend/replication/logical/snapbuild.c | 6 +++--- src/backend/replication/logical/worker.c | 4 ++-- src/backend/replication/repl_gram.y | 1 + src/backend/replication/walsender.c | 2 +- src/backend/tcop/postgres.c | 8 +++++--- 13 files changed, 56 insertions(+), 62 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index f587a08b6a..ab78585035 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -302,7 +302,7 @@ <title>System Catalogs</title> <row> <entry><link linkend="catalog-pg-subscription-rel"><structname>pg_subscription_rel</structname></link></entry> - <entry>relation state mapping for subscriptions</entry> + <entry>relation state for subscriptions</entry> </row> <row> @@ -6429,14 +6429,14 @@ <title><structname>pg_subscription_rel</structname></title> <para> The catalog <structname>pg_subscription_rel</structname> contains the - status for each replicated relation in each subscription. This is a + state for each replicated relation in each subscription. This is a many-to-many mapping. </para> <para> - This catalog only contains tables known to subscription after running + This catalog only contains tables known to the subscription after running either <command>CREATE SUBSCRIPTION</command> or - <command>ALTER SUBSCRIPTION ... REFRESH</command> commands. + <command>ALTER SUBSCRIPTION ... REFRESH</command>. </para> <table> @@ -6472,6 +6472,7 @@ <title><structname>pg_subscription_rel</structname> Columns</title> <entry><type>char</type></entry> <entry></entry> <entry> + State code: <literal>i</> = initialize, <literal>d</> = data is being copied, <literal>s</> = synchronized, diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f75304cd91..4ec6bb49b7 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -24,8 +24,8 @@ <title>Logical Replication</title> </para> <para> - Logical replication typically starts with a snapshot of the data on - the publisher database. Once that is done, the changes on the publisher + Logical replication typically starts with a taking a snapshot of the data on + the publisher database and copying that to the subscriber. Once that is done, the changes on the publisher are sent to the subscriber as they occur in real-time. The subscriber applies the data in the same order as the publisher so that transactional consistency is guaranteed for publications within a single subscription. @@ -162,7 +162,7 @@ <title>Subscription</title> <para> Each subscription will receive changes via one replication slot (see <xref linkend="streaming-replication-slots">). Additional temporary - replication slots may be required for the initial data synchronizations + replication slots may be required for the initial data synchronization of pre-existing table data. </para> @@ -308,7 +308,7 @@ <title>Monitoring</title> Normally, there is a single apply process running for an enabled subscription. A disabled subscription or a crashed subscription will have zero rows in this view. If the initial data synchronization of any - table is in progress there will be additional workers for the tables + table is in progress, there will be additional workers for the tables being synchronized. </para> </sect1> @@ -355,8 +355,8 @@ <title>Configuration Settings</title> <para> On the publisher side, <varname>wal_level</varname> must be set to <literal>logical</literal>, and <varname>max_replication_slots</varname> - must be set to at least the number of subscriptions expected to connect - with some reserve for table synchronization. And + must be set to at least the number of subscriptions expected to connect, + plus some reserve for table synchronization. And <varname>max_wal_senders</varname> should be set to at least the same as <varname>max_replication_slots</varname> plus the number of physical replicas that are connected at the same time. @@ -367,7 +367,7 @@ <title>Configuration Settings</title> to be set. In this case it should be set to at least the number of subscriptions that will be added to the subscriber. <varname>max_logical_replication_workers</varname> must be set to at - least the number of subscriptions again with some reserve for the table + least the number of subscriptions, again plus some reserve for the table synchronization. Additionally the <varname>max_worker_processes</varname> may need to be adjusted to accommodate for replication workers, at least (<varname>max_logical_replication_workers</varname> @@ -413,8 +413,8 @@ <title>Quick Setup</title> <para> The above will start the replication process, which synchronizes the - initial table contents of <literal>users</literal> and - <literal>departments</literal> tables and then starts replicating + initial table contents of the tables <literal>users</literal> and + <literal>departments</literal> and then starts replicating incremental changes to those tables. </para> </sect1> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3d3761ec96..88340316cd 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1587,8 +1587,8 @@ <title><structname>pg_stat_subscription</structname> View</title> <row> <entry><structfield>relid</></entry> <entry><type>Oid</></entry> - <entry>Relation id which the worker is synchronizing, this is always - NULL for the main apply worker</entry> + <entry>OID of the relation that the worker is synchronizing; null for the + main apply worker</entry> </row> <row> <entry><structfield>received_lsn</></entry> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 99f1f1f8b7..15c1d8d1db 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1545,14 +1545,14 @@ <title>Streaming Replication Protocol</title> <term><literal>USE_SNAPSHOT</></term> <listitem> <para> - Decides what to do with snapshot created during logical slot - initialization. The <literal>EXPORT_SNAPSHOT</> (which is the - default) will export the snapshot for use in other sessions. This - option can't be used inside a transaction. The - <literal>USE_SNAPSHOT</> will use the snapshot for current + Decides what to do with the snapshot created during logical slot + initialization. <literal>EXPORT_SNAPSHOT</>, which is the + default, will export the snapshot for use in other sessions. This + option can't be used inside a transaction. + <literal>USE_SNAPSHOT</> will use the snapshot for the current transaction executing the command. This option must be used in a - transaction and the <literal>CREATE_REPLICATION_SLOT</literal> must - be the first command run in that transaction. Finally + transaction, and <literal>CREATE_REPLICATION_SLOT</literal> must + be the first command run in that transaction. Finally, <literal>NOEXPORT_SNAPSHOT</> will just use the snapshot for logical decoding as normal but won't do anything else with it. </para> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index b34386d3c1..e74614a74a 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -99,14 +99,14 @@ <title>Parameters</title> <listitem> <para> Fetch missing table info from publisher. This will start replication - of tables that were added to subscribed publications since last - invocation of <command>REFRESH PUBLICATION</command> or since the + of tables that were added to the subscribed-to publications since the last + invocation of <command>REFRESH PUBLICATION</command> or since <command>CREATE SUBSCRIPTION</command>. </para> <para> The <literal>COPY DATA</literal> and <literal>NOCOPY DATA</literal> - options specify if the existing data in the publication that are being - subscribed should be copied. <literal>COPY DATA</literal> is the + options specify if the existing data in the publications that are being + subscribed to should be copied. <literal>COPY DATA</literal> is the default. </para> </listitem> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 91127ead88..6468470039 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -137,8 +137,8 @@ <title>Parameters</title> <term>NOCOPY DATA</term> <listitem> <para> - Specifies if the existing data in the publication that are being - subscribed should be copied once the replication starts. + Specifies if the existing data in the publications that are being + subscribed to should be copied once the replication starts. <literal>COPY DATA</literal> is the default. </para> </listitem> @@ -148,18 +148,18 @@ <title>Parameters</title> <term>NOCONNECT</term> <listitem> <para> - Instructs the <command>CREATE SUBSCRIPTION</command> to skip initial + Instructs <command>CREATE SUBSCRIPTION</command> to skip the initial connection to the provider. This will change default values of other options to <literal>DISABLED</literal>, - <literal>NOCREATE SLOT</literal> and <literal>NOCOPY DATA</literal>. + <literal>NOCREATE SLOT</literal>, and <literal>NOCOPY DATA</literal>. </para> <para> It's not allowed to combine <literal>NOCONNECT</literal> and - <literal>ENABLED</literal>, <literal>CREATE SLOT</literal> or + <literal>ENABLED</literal>, <literal>CREATE SLOT</literal>, or <literal>COPY DATA</literal>. </para> <para> - Since no connection is made when this option is specified the tables + Since no connection is made when this option is specified, the tables are not subscribed, so after you enable the subscription nothing will be replicated. It is required to run <literal>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</> in order for @@ -167,7 +167,6 @@ <title>Parameters</title> </para> </listitem> </varlistentry> - </variablelist> </refsect1> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8850b7eff1..d90c673952 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -263,19 +263,17 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, /* Update the tuple. */ memset(values, 0, sizeof(values)); - memset(nulls, true, sizeof(nulls)); + memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; - nulls[Anum_pg_subscription_rel_srsubstate - 1] = false; values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; if (sublsn != InvalidXLogRecPtr) - { - nulls[Anum_pg_subscription_rel_srsublsn - 1] = false; values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - } + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); @@ -289,9 +287,6 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, /* Cleanup. */ heap_close(rel, NoLock); - /* Make the changes visible. */ - CommandCounterIncrement(); - return subrelid; } @@ -323,7 +318,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, { heap_close(rel, RowExclusiveLock); *sublsn = InvalidXLogRecPtr; - return '\0'; + return SUBREL_STATE_UNKNOWN; } ereport(ERROR, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3e724de5f1..06d5509fd3 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -296,15 +296,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, BGWORKER_BACKEND_DATABASE_CONNECTION; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; bgw.bgw_main = ApplyWorkerMain; - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u", subid); - if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker %u sync %u", subid, relid); + "logical replication worker for subscription %u sync %u", subid, relid); else snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker %u", subid); + "logical replication worker for subscription %u", subid); bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; @@ -434,7 +431,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) LWLockRelease(LogicalRepWorkerLock); if (worker) - SetLatch(&worker->proc->procLatch); + logicalrep_worker_wakeup_ptr(worker); } /* @@ -443,8 +440,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) { - if (worker) - SetLatch(&worker->proc->procLatch); + SetLatch(&worker->proc->procLatch); } /* @@ -817,10 +813,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) MemSet(nulls, 0, sizeof(nulls)); values[0] = ObjectIdGetDatum(worker.subid); - if (!OidIsValid(worker.relid)) - nulls[1] = true; - else + if (OidIsValid(worker.relid)) values[1] = ObjectIdGetDatum(worker.relid); + else + nulls[1] = true; values[2] = Int32GetDatum(worker_pid); if (XLogRecPtrIsInvalid(worker.last_lsn)) nulls[3] = true; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 3f0abd7ce2..de90777cf9 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -515,14 +515,14 @@ SnapBuildInitalSnapshot(SnapBuild *builder) Assert(XactIsoLevel = XACT_REPEATABLE_READ); if (builder->state != SNAPBUILD_CONSISTENT) - elog(ERROR, "cannot build and initial slot snapshot before reaching a consistent state"); + elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state"); if (!builder->committed.includes_all_transactions) - elog(ERROR, "cannot build and initial slot snapshot, not all transactions are monitored anymore"); + elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore"); /* so we don't overwrite the existing value */ if (TransactionIdIsValid(MyPgXact->xmin)) - elog(ERROR, "cannot build and initial slot snapshot when MyPgXact->xmin already is valid"); + elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid"); snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId()); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5383364011..e7fda70b75 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -109,8 +109,8 @@ WalReceiverConn *wrconn = NULL; Subscription *MySubscription = NULL; bool MySubscriptionValid = false; -static char *myslotname = NULL; -bool in_remote_transaction = false; +static char *myslotname = NULL; +bool in_remote_transaction = false; static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0755b88f5a..5990be52db 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -27,6 +27,7 @@ Node *replication_parse_result; static SQLCmd *make_sqlcmd(void); + /* * Bison doesn't allocate anything that needs to live across parser calls, * so we can easily have it use palloc instead of malloc. This prevents diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 02e1652f11..b4c7f73cf5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1380,7 +1380,7 @@ exec_replication_command(const char *cmd_string) * For aborted transactions, don't allow anything except pure SQL, * the exec_simple_query() will handle it correctly. */ - if (IsAbortedTransactionBlockState() && cmd_node->type != T_SQLCmd) + if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd)) ereport(ERROR, (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), errmsg("current transaction is aborted, " diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index b54ad50aae..ba41f90712 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4053,7 +4053,6 @@ PostgresMain(int argc, char *argv[], case 'Q': /* simple query */ { const char *query_string; - bool walsender_query = false; /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4062,8 +4061,11 @@ PostgresMain(int argc, char *argv[], pq_getmsgend(&input_message); if (am_walsender) - walsender_query = exec_replication_command(query_string); - if (!walsender_query) + { + if (!exec_replication_command(query_string)) + exec_simple_query(query_string); + } + else exec_simple_query(query_string); send_ready_for_query = true; -- 2.12.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers