On 3/6/17 05:27, Petr Jelinek wrote:
> updated and rebased version of the patch attached.
Some comments on this patch:
Can we have a better explanation of different snapshot options in
CREATE_REPLICATION_SLOT. We use all these variants in different
places, but it's not fully documented why. Think about interested
users reading this.
errmsg("subscription table %u in subscription %u does not exist",
Use names instead of IDs if possible.
+ libpqrcv_table_list,
+ libpqrcv_table_info,
+ libpqrcv_table_copy,
I don't think these functions belong into the WAL receiver API, since
they are specific to this particular logical replication
implementation. I would just make an API function libpqrc_exec_sql()
that runs a query, and then put the table_*() functions as wrappers
around that into tablesync.c.
Not sure what the worker init stuff in ApplyLauncherShmemInit() is
doing. Could be commented more.
There are a lot of places that do one of
MyLogicalRepWorker->relid == InvalidOid
OidIsValid(MyLogicalRepWorker->relid)
To check whether the current worker is a sync worker. Wrap that into
a function.
Not a fan of adding CommandCounterIncrement() calls at the end of
functions. In some cases, they are not necessary at all. In cases
where they are, the CCI call should be at a higher level between two
function calls with a comment for why the call below needs to see the
changes from the call above.
The index name pg_subscription_rel_map_index/SubscriptionRelMapIndexId
doesn't seem to match existing style, since there is no "map" column.
How about pg_subscription_rel_rel_sub_index? I see we have a
similarly named index for publications.
max_sync_workers_per_subscription could be PGC_SIGHUP, I think. And
the minimum could be 0, to stop any syncing?
pg_subscription_rel.h: I'm not sure under which circumstances we need
to use BKI_ROWTYPE_OID.
Does pg_subscription_rel need an OID column? The index
SubscriptionRelOidIndexId is not used anywhere.
You removed this command from the tests:
ALTER SUBSCRIPTION testsub SET PUBLICATION testpub, testpub1;
I suppose because it causes a connection. We should have an option to
prevent that (NOCONNECT/NOREFRESH?).
Why was the test 'check replication origin was dropped on subscriber'
removed?
Attached also a small patch with some stylistic changes.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From c6ba1eaa3c5a44e4a9f6d072cb95fcf7e68ba3d6 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <[email protected]>
Date: Thu, 9 Mar 2017 08:19:06 -0500
Subject: [PATCH] fixup! Logical replication support for initial data copy
---
doc/src/sgml/catalogs.sgml | 9 +++++----
doc/src/sgml/logical-replication.sgml | 18 +++++++++---------
doc/src/sgml/monitoring.sgml | 4 ++--
doc/src/sgml/protocol.sgml | 14 +++++++-------
doc/src/sgml/ref/alter_subscription.sgml | 8 ++++----
doc/src/sgml/ref/create_subscription.sgml | 13 ++++++-------
src/backend/catalog/pg_subscription.c | 13 ++++---------
src/backend/replication/logical/launcher.c | 18 +++++++-----------
src/backend/replication/logical/snapbuild.c | 6 +++---
src/backend/replication/logical/worker.c | 4 ++--
src/backend/replication/repl_gram.y | 1 +
src/backend/replication/walsender.c | 2 +-
src/backend/tcop/postgres.c | 8 +++++---
13 files changed, 56 insertions(+), 62 deletions(-)
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index f587a08b6a..ab78585035 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -302,7 +302,7 @@ <title>System Catalogs</title>
<row>
<entry><link linkend="catalog-pg-subscription-rel"><structname>pg_subscription_rel</structname></link></entry>
- <entry>relation state mapping for subscriptions</entry>
+ <entry>relation state for subscriptions</entry>
</row>
<row>
@@ -6429,14 +6429,14 @@ <title><structname>pg_subscription_rel</structname></title>
<para>
The catalog <structname>pg_subscription_rel</structname> contains the
- status for each replicated relation in each subscription. This is a
+ state for each replicated relation in each subscription. This is a
many-to-many mapping.
</para>
<para>
- This catalog only contains tables known to subscription after running
+ This catalog only contains tables known to the subscription after running
either <command>CREATE SUBSCRIPTION</command> or
- <command>ALTER SUBSCRIPTION ... REFRESH</command> commands.
+ <command>ALTER SUBSCRIPTION ... REFRESH</command>.
</para>
<table>
@@ -6472,6 +6472,7 @@ <title><structname>pg_subscription_rel</structname> Columns</title>
<entry><type>char</type></entry>
<entry></entry>
<entry>
+ State code:
<literal>i</> = initialize,
<literal>d</> = data is being copied,
<literal>s</> = synchronized,
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index f75304cd91..4ec6bb49b7 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -24,8 +24,8 @@ <title>Logical Replication</title>
</para>
<para>
- Logical replication typically starts with a snapshot of the data on
- the publisher database. Once that is done, the changes on the publisher
+ Logical replication typically starts with a taking a snapshot of the data on
+ the publisher database and copying that to the subscriber. Once that is done, the changes on the publisher
are sent to the subscriber as they occur in real-time. The subscriber
applies the data in the same order as the publisher so that transactional
consistency is guaranteed for publications within a single subscription.
@@ -162,7 +162,7 @@ <title>Subscription</title>
<para>
Each subscription will receive changes via one replication slot (see
<xref linkend="streaming-replication-slots">). Additional temporary
- replication slots may be required for the initial data synchronizations
+ replication slots may be required for the initial data synchronization
of pre-existing table data.
</para>
@@ -308,7 +308,7 @@ <title>Monitoring</title>
Normally, there is a single apply process running for an enabled
subscription. A disabled subscription or a crashed subscription will have
zero rows in this view. If the initial data synchronization of any
- table is in progress there will be additional workers for the tables
+ table is in progress, there will be additional workers for the tables
being synchronized.
</para>
</sect1>
@@ -355,8 +355,8 @@ <title>Configuration Settings</title>
<para>
On the publisher side, <varname>wal_level</varname> must be set to
<literal>logical</literal>, and <varname>max_replication_slots</varname>
- must be set to at least the number of subscriptions expected to connect
- with some reserve for table synchronization. And
+ must be set to at least the number of subscriptions expected to connect,
+ plus some reserve for table synchronization. And
<varname>max_wal_senders</varname> should be set to at least the same as
<varname>max_replication_slots</varname> plus the number of physical
replicas that are connected at the same time.
@@ -367,7 +367,7 @@ <title>Configuration Settings</title>
to be set. In this case it should be set to at least the number of
subscriptions that will be added to the subscriber.
<varname>max_logical_replication_workers</varname> must be set to at
- least the number of subscriptions again with some reserve for the table
+ least the number of subscriptions, again plus some reserve for the table
synchronization. Additionally the <varname>max_worker_processes</varname>
may need to be adjusted to accommodate for replication workers, at least
(<varname>max_logical_replication_workers</varname>
@@ -413,8 +413,8 @@ <title>Quick Setup</title>
<para>
The above will start the replication process, which synchronizes the
- initial table contents of <literal>users</literal> and
- <literal>departments</literal> tables and then starts replicating
+ initial table contents of the tables <literal>users</literal> and
+ <literal>departments</literal> and then starts replicating
incremental changes to those tables.
</para>
</sect1>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3d3761ec96..88340316cd 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1587,8 +1587,8 @@ <title><structname>pg_stat_subscription</structname> View</title>
<row>
<entry><structfield>relid</></entry>
<entry><type>Oid</></entry>
- <entry>Relation id which the worker is synchronizing, this is always
- NULL for the main apply worker</entry>
+ <entry>OID of the relation that the worker is synchronizing; null for the
+ main apply worker</entry>
</row>
<row>
<entry><structfield>received_lsn</></entry>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 99f1f1f8b7..15c1d8d1db 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1545,14 +1545,14 @@ <title>Streaming Replication Protocol</title>
<term><literal>USE_SNAPSHOT</></term>
<listitem>
<para>
- Decides what to do with snapshot created during logical slot
- initialization. The <literal>EXPORT_SNAPSHOT</> (which is the
- default) will export the snapshot for use in other sessions. This
- option can't be used inside a transaction. The
- <literal>USE_SNAPSHOT</> will use the snapshot for current
+ Decides what to do with the snapshot created during logical slot
+ initialization. <literal>EXPORT_SNAPSHOT</>, which is the
+ default, will export the snapshot for use in other sessions. This
+ option can't be used inside a transaction.
+ <literal>USE_SNAPSHOT</> will use the snapshot for the current
transaction executing the command. This option must be used in a
- transaction and the <literal>CREATE_REPLICATION_SLOT</literal> must
- be the first command run in that transaction. Finally
+ transaction, and <literal>CREATE_REPLICATION_SLOT</literal> must
+ be the first command run in that transaction. Finally,
<literal>NOEXPORT_SNAPSHOT</> will just use the snapshot for logical
decoding as normal but won't do anything else with it.
</para>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index b34386d3c1..e74614a74a 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -99,14 +99,14 @@ <title>Parameters</title>
<listitem>
<para>
Fetch missing table info from publisher. This will start replication
- of tables that were added to subscribed publications since last
- invocation of <command>REFRESH PUBLICATION</command> or since the
+ of tables that were added to the subscribed-to publications since the last
+ invocation of <command>REFRESH PUBLICATION</command> or since
<command>CREATE SUBSCRIPTION</command>.
</para>
<para>
The <literal>COPY DATA</literal> and <literal>NOCOPY DATA</literal>
- options specify if the existing data in the publication that are being
- subscribed should be copied. <literal>COPY DATA</literal> is the
+ options specify if the existing data in the publications that are being
+ subscribed to should be copied. <literal>COPY DATA</literal> is the
default.
</para>
</listitem>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 91127ead88..6468470039 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -137,8 +137,8 @@ <title>Parameters</title>
<term>NOCOPY DATA</term>
<listitem>
<para>
- Specifies if the existing data in the publication that are being
- subscribed should be copied once the replication starts.
+ Specifies if the existing data in the publications that are being
+ subscribed to should be copied once the replication starts.
<literal>COPY DATA</literal> is the default.
</para>
</listitem>
@@ -148,18 +148,18 @@ <title>Parameters</title>
<term>NOCONNECT</term>
<listitem>
<para>
- Instructs the <command>CREATE SUBSCRIPTION</command> to skip initial
+ Instructs <command>CREATE SUBSCRIPTION</command> to skip the initial
connection to the provider. This will change default values of other
options to <literal>DISABLED</literal>,
- <literal>NOCREATE SLOT</literal> and <literal>NOCOPY DATA</literal>.
+ <literal>NOCREATE SLOT</literal>, and <literal>NOCOPY DATA</literal>.
</para>
<para>
It's not allowed to combine <literal>NOCONNECT</literal> and
- <literal>ENABLED</literal>, <literal>CREATE SLOT</literal> or
+ <literal>ENABLED</literal>, <literal>CREATE SLOT</literal>, or
<literal>COPY DATA</literal>.
</para>
<para>
- Since no connection is made when this option is specified the tables
+ Since no connection is made when this option is specified, the tables
are not subscribed, so after you enable the subscription nothing will
be replicated. It is required to run
<literal>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</> in order for
@@ -167,7 +167,6 @@ <title>Parameters</title>
</para>
</listitem>
</varlistentry>
-
</variablelist>
</refsect1>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 8850b7eff1..d90c673952 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -263,19 +263,17 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
/* Update the tuple. */
memset(values, 0, sizeof(values));
- memset(nulls, true, sizeof(nulls));
+ memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces));
replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
- nulls[Anum_pg_subscription_rel_srsubstate - 1] = false;
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
if (sublsn != InvalidXLogRecPtr)
- {
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = false;
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- }
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
replaces);
@@ -289,9 +287,6 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
/* Cleanup. */
heap_close(rel, NoLock);
- /* Make the changes visible. */
- CommandCounterIncrement();
-
return subrelid;
}
@@ -323,7 +318,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
{
heap_close(rel, RowExclusiveLock);
*sublsn = InvalidXLogRecPtr;
- return '\0';
+ return SUBREL_STATE_UNKNOWN;
}
ereport(ERROR,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3e724de5f1..06d5509fd3 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -296,15 +296,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
bgw.bgw_main = ApplyWorkerMain;
- snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication worker for subscription %u", subid);
-
if (OidIsValid(relid))
snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication worker %u sync %u", subid, relid);
+ "logical replication worker for subscription %u sync %u", subid, relid);
else
snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication worker %u", subid);
+ "logical replication worker for subscription %u", subid);
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
@@ -434,7 +431,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid)
LWLockRelease(LogicalRepWorkerLock);
if (worker)
- SetLatch(&worker->proc->procLatch);
+ logicalrep_worker_wakeup_ptr(worker);
}
/*
@@ -443,8 +440,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid)
void
logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
{
- if (worker)
- SetLatch(&worker->proc->procLatch);
+ SetLatch(&worker->proc->procLatch);
}
/*
@@ -817,10 +813,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
MemSet(nulls, 0, sizeof(nulls));
values[0] = ObjectIdGetDatum(worker.subid);
- if (!OidIsValid(worker.relid))
- nulls[1] = true;
- else
+ if (OidIsValid(worker.relid))
values[1] = ObjectIdGetDatum(worker.relid);
+ else
+ nulls[1] = true;
values[2] = Int32GetDatum(worker_pid);
if (XLogRecPtrIsInvalid(worker.last_lsn))
nulls[3] = true;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 3f0abd7ce2..de90777cf9 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -515,14 +515,14 @@ SnapBuildInitalSnapshot(SnapBuild *builder)
Assert(XactIsoLevel = XACT_REPEATABLE_READ);
if (builder->state != SNAPBUILD_CONSISTENT)
- elog(ERROR, "cannot build and initial slot snapshot before reaching a consistent state");
+ elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
if (!builder->committed.includes_all_transactions)
- elog(ERROR, "cannot build and initial slot snapshot, not all transactions are monitored anymore");
+ elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
/* so we don't overwrite the existing value */
if (TransactionIdIsValid(MyPgXact->xmin))
- elog(ERROR, "cannot build and initial slot snapshot when MyPgXact->xmin already is valid");
+ elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5383364011..e7fda70b75 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -109,8 +109,8 @@ WalReceiverConn *wrconn = NULL;
Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
-static char *myslotname = NULL;
-bool in_remote_transaction = false;
+static char *myslotname = NULL;
+bool in_remote_transaction = false;
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0755b88f5a..5990be52db 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -27,6 +27,7 @@ Node *replication_parse_result;
static SQLCmd *make_sqlcmd(void);
+
/*
* Bison doesn't allocate anything that needs to live across parser calls,
* so we can easily have it use palloc instead of malloc. This prevents
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 02e1652f11..b4c7f73cf5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1380,7 +1380,7 @@ exec_replication_command(const char *cmd_string)
* For aborted transactions, don't allow anything except pure SQL,
* the exec_simple_query() will handle it correctly.
*/
- if (IsAbortedTransactionBlockState() && cmd_node->type != T_SQLCmd)
+ if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
ereport(ERROR,
(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
errmsg("current transaction is aborted, "
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index b54ad50aae..ba41f90712 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4053,7 +4053,6 @@ PostgresMain(int argc, char *argv[],
case 'Q': /* simple query */
{
const char *query_string;
- bool walsender_query = false;
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
@@ -4062,8 +4061,11 @@ PostgresMain(int argc, char *argv[],
pq_getmsgend(&input_message);
if (am_walsender)
- walsender_query = exec_replication_command(query_string);
- if (!walsender_query)
+ {
+ if (!exec_replication_command(query_string))
+ exec_simple_query(query_string);
+ }
+ else
exec_simple_query(query_string);
send_ready_for_query = true;
--
2.12.0
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers