On 3/6/17 05:27, Petr Jelinek wrote:
> updated and rebased version of the patch attached.

Some comments on this patch:

Can we have a better explanation of different snapshot options in
CREATE_REPLICATION_SLOT.  We use all these variants in different
places, but it's not fully documented why.  Think about interested
users reading this.


errmsg("subscription table %u in subscription %u does not exist",

Use names instead of IDs if possible.


+   libpqrcv_table_list,
+   libpqrcv_table_info,
+   libpqrcv_table_copy,

I don't think these functions belong into the WAL receiver API, since
they are specific to this particular logical replication
implementation.  I would just make an API function libpqrc_exec_sql()
that runs a query, and then put the table_*() functions as wrappers
around that into tablesync.c.


Not sure what the worker init stuff in ApplyLauncherShmemInit() is
doing.  Could be commented more.


There are a lot of places that do one of

    MyLogicalRepWorker->relid == InvalidOid
    OidIsValid(MyLogicalRepWorker->relid)

To check whether the current worker is a sync worker.  Wrap that into
a function.


Not a fan of adding CommandCounterIncrement() calls at the end of
functions.  In some cases, they are not necessary at all.  In cases
where they are, the CCI call should be at a higher level between two
function calls with a comment for why the call below needs to see the
changes from the call above.


The index name pg_subscription_rel_map_index/SubscriptionRelMapIndexId
doesn't seem to match existing style, since there is no "map" column.
How about pg_subscription_rel_rel_sub_index?  I see we have a
similarly named index for publications.


max_sync_workers_per_subscription could be PGC_SIGHUP, I think.  And
the minimum could be 0, to stop any syncing?


pg_subscription_rel.h: I'm not sure under which circumstances we need
to use BKI_ROWTYPE_OID.


Does pg_subscription_rel need an OID column?  The index
SubscriptionRelOidIndexId is not used anywhere.


You removed this command from the tests:

    ALTER SUBSCRIPTION testsub SET PUBLICATION testpub, testpub1;

I suppose because it causes a connection.  We should have an option to
prevent that (NOCONNECT/NOREFRESH?).


Why was the test 'check replication origin was dropped on subscriber'
removed?


Attached also a small patch with some stylistic changes.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From c6ba1eaa3c5a44e4a9f6d072cb95fcf7e68ba3d6 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Thu, 9 Mar 2017 08:19:06 -0500
Subject: [PATCH] fixup! Logical replication support for initial data copy

---
 doc/src/sgml/catalogs.sgml                  |  9 +++++----
 doc/src/sgml/logical-replication.sgml       | 18 +++++++++---------
 doc/src/sgml/monitoring.sgml                |  4 ++--
 doc/src/sgml/protocol.sgml                  | 14 +++++++-------
 doc/src/sgml/ref/alter_subscription.sgml    |  8 ++++----
 doc/src/sgml/ref/create_subscription.sgml   | 13 ++++++-------
 src/backend/catalog/pg_subscription.c       | 13 ++++---------
 src/backend/replication/logical/launcher.c  | 18 +++++++-----------
 src/backend/replication/logical/snapbuild.c |  6 +++---
 src/backend/replication/logical/worker.c    |  4 ++--
 src/backend/replication/repl_gram.y         |  1 +
 src/backend/replication/walsender.c         |  2 +-
 src/backend/tcop/postgres.c                 |  8 +++++---
 13 files changed, 56 insertions(+), 62 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index f587a08b6a..ab78585035 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -302,7 +302,7 @@ <title>System Catalogs</title>
 
      <row>
       <entry><link linkend="catalog-pg-subscription-rel"><structname>pg_subscription_rel</structname></link></entry>
-      <entry>relation state mapping for subscriptions</entry>
+      <entry>relation state for subscriptions</entry>
      </row>
 
      <row>
@@ -6429,14 +6429,14 @@ <title><structname>pg_subscription_rel</structname></title>
 
   <para>
    The catalog <structname>pg_subscription_rel</structname> contains the
-   status for each replicated relation in each subscription.  This is a
+   state for each replicated relation in each subscription.  This is a
    many-to-many mapping.
   </para>
 
   <para>
-   This catalog only contains tables known to subscription after running
+   This catalog only contains tables known to the subscription after running
    either <command>CREATE SUBSCRIPTION</command> or
-   <command>ALTER SUBSCRIPTION ... REFRESH</command> commands.
+   <command>ALTER SUBSCRIPTION ... REFRESH</command>.
   </para>
 
   <table>
@@ -6472,6 +6472,7 @@ <title><structname>pg_subscription_rel</structname> Columns</title>
       <entry><type>char</type></entry>
       <entry></entry>
       <entry>
+       State code:
        <literal>i</> = initialize,
        <literal>d</> = data is being copied,
        <literal>s</> = synchronized,
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index f75304cd91..4ec6bb49b7 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -24,8 +24,8 @@ <title>Logical Replication</title>
  </para>
 
  <para>
-  Logical replication typically starts with a snapshot of the data on
-  the publisher database. Once that is done, the changes on the publisher
+  Logical replication typically starts with a taking a snapshot of the data on
+  the publisher database and copying that to the subscriber.  Once that is done, the changes on the publisher
   are sent to the subscriber as they occur in real-time.  The subscriber
   applies the data in the same order as the publisher so that transactional
   consistency is guaranteed for publications within a single subscription.
@@ -162,7 +162,7 @@ <title>Subscription</title>
   <para>
    Each subscription will receive changes via one replication slot (see
    <xref linkend="streaming-replication-slots">).  Additional temporary
-   replication slots may be required for the initial data synchronizations
+   replication slots may be required for the initial data synchronization
    of pre-existing table data.
   </para>
 
@@ -308,7 +308,7 @@ <title>Monitoring</title>
    Normally, there is a single apply process running for an enabled
    subscription.  A disabled subscription or a crashed subscription will have
    zero rows in this view.  If the initial data synchronization of any
-   table is in progress there will be additional workers for the tables
+   table is in progress, there will be additional workers for the tables
    being synchronized.
   </para>
  </sect1>
@@ -355,8 +355,8 @@ <title>Configuration Settings</title>
   <para>
    On the publisher side, <varname>wal_level</varname> must be set to
    <literal>logical</literal>, and <varname>max_replication_slots</varname>
-   must be set to at least the number of subscriptions expected to connect
-   with some reserve for table synchronization.  And
+   must be set to at least the number of subscriptions expected to connect,
+   plus some reserve for table synchronization.  And
    <varname>max_wal_senders</varname> should be set to at least the same as
    <varname>max_replication_slots</varname> plus the number of physical
    replicas that are connected at the same time.
@@ -367,7 +367,7 @@ <title>Configuration Settings</title>
    to be set.  In this case it should be set to at least the number of
    subscriptions that will be added to the subscriber.
    <varname>max_logical_replication_workers</varname> must be set to at
-   least the number of subscriptions again with some reserve for the table
+   least the number of subscriptions, again plus some reserve for the table
    synchronization.  Additionally the <varname>max_worker_processes</varname>
    may need to be adjusted to accommodate for replication workers, at least
    (<varname>max_logical_replication_workers</varname>
@@ -413,8 +413,8 @@ <title>Quick Setup</title>
 
   <para>
    The above will start the replication process, which synchronizes the
-   initial table contents of <literal>users</literal> and
-   <literal>departments</literal> tables and then starts replicating
+   initial table contents of the tables <literal>users</literal> and
+   <literal>departments</literal> and then starts replicating
    incremental changes to those tables.
   </para>
  </sect1>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3d3761ec96..88340316cd 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1587,8 +1587,8 @@ <title><structname>pg_stat_subscription</structname> View</title>
     <row>
      <entry><structfield>relid</></entry>
      <entry><type>Oid</></entry>
-     <entry>Relation id which the worker is synchronizing, this is always
-      NULL for the main apply worker</entry>
+     <entry>OID of the relation that the worker is synchronizing; null for the
+     main apply worker</entry>
     </row>
     <row>
      <entry><structfield>received_lsn</></entry>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 99f1f1f8b7..15c1d8d1db 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1545,14 +1545,14 @@ <title>Streaming Replication Protocol</title>
        <term><literal>USE_SNAPSHOT</></term>
        <listitem>
         <para>
-         Decides what to do with snapshot created during logical slot
-         initialization. The <literal>EXPORT_SNAPSHOT</> (which is the
-         default) will export the snapshot for use in other sessions. This
-         option can't be used inside a transaction. The
-         <literal>USE_SNAPSHOT</> will use the snapshot for current
+         Decides what to do with the snapshot created during logical slot
+         initialization. <literal>EXPORT_SNAPSHOT</>, which is the
+         default, will export the snapshot for use in other sessions. This
+         option can't be used inside a transaction.
+         <literal>USE_SNAPSHOT</> will use the snapshot for the current
          transaction executing the command. This option must be used in a
-         transaction and the <literal>CREATE_REPLICATION_SLOT</literal> must
-         be the first command run in that transaction. Finally
+         transaction, and <literal>CREATE_REPLICATION_SLOT</literal> must
+         be the first command run in that transaction. Finally,
          <literal>NOEXPORT_SNAPSHOT</> will just use the snapshot for logical
          decoding as normal but won't do anything else with it.
         </para>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index b34386d3c1..e74614a74a 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -99,14 +99,14 @@ <title>Parameters</title>
     <listitem>
      <para>
       Fetch missing table info from publisher. This will start replication
-      of tables that were added to subscribed publications since last
-      invocation of <command>REFRESH PUBLICATION</command> or since the
+      of tables that were added to the subscribed-to publications since the last
+      invocation of <command>REFRESH PUBLICATION</command> or since
       <command>CREATE SUBSCRIPTION</command>.
      </para>
      <para>
       The <literal>COPY DATA</literal> and <literal>NOCOPY DATA</literal>
-      options specify if the existing data in the publication that are being
-      subscribed should be copied. <literal>COPY DATA</literal> is the
+      options specify if the existing data in the publications that are being
+      subscribed to should be copied. <literal>COPY DATA</literal> is the
       default.
      </para>
     </listitem>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 91127ead88..6468470039 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -137,8 +137,8 @@ <title>Parameters</title>
     <term>NOCOPY DATA</term>
     <listitem>
      <para>
-      Specifies if the existing data in the publication that are being
-      subscribed should be copied once the replication starts.
+      Specifies if the existing data in the publications that are being
+      subscribed to should be copied once the replication starts.
       <literal>COPY DATA</literal> is the default.
      </para>
     </listitem>
@@ -148,18 +148,18 @@ <title>Parameters</title>
     <term>NOCONNECT</term>
     <listitem>
      <para>
-      Instructs the <command>CREATE SUBSCRIPTION</command> to skip initial
+      Instructs <command>CREATE SUBSCRIPTION</command> to skip the initial
       connection to the provider. This will change default values of other
       options to <literal>DISABLED</literal>,
-      <literal>NOCREATE SLOT</literal> and <literal>NOCOPY DATA</literal>.
+      <literal>NOCREATE SLOT</literal>, and <literal>NOCOPY DATA</literal>.
      </para>
      <para>
       It's not allowed to combine <literal>NOCONNECT</literal> and
-      <literal>ENABLED</literal>, <literal>CREATE SLOT</literal> or
+      <literal>ENABLED</literal>, <literal>CREATE SLOT</literal>, or
       <literal>COPY DATA</literal>.
      </para>
      <para>
-      Since no connection is made when this option is specified the tables
+      Since no connection is made when this option is specified, the tables
       are not subscribed, so after you enable the subscription nothing will
       be replicated. It is required to run
       <literal>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</> in order for
@@ -167,7 +167,6 @@ <title>Parameters</title>
      </para>
     </listitem>
    </varlistentry>
-
   </variablelist>
  </refsect1>
 
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 8850b7eff1..d90c673952 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -263,19 +263,17 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
 
 		/* Update the tuple. */
 		memset(values, 0, sizeof(values));
-		memset(nulls, true, sizeof(nulls));
+		memset(nulls, false, sizeof(nulls));
 		memset(replaces, false, sizeof(replaces));
 
 		replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-		nulls[Anum_pg_subscription_rel_srsubstate - 1] = false;
 		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
 
 		replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
 		if (sublsn != InvalidXLogRecPtr)
-		{
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = false;
 			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		}
+		else
+			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
 		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
 								replaces);
@@ -289,9 +287,6 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
 	/* Cleanup. */
 	heap_close(rel, NoLock);
 
-	/* Make the changes visible. */
-	CommandCounterIncrement();
-
 	return subrelid;
 }
 
@@ -323,7 +318,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
 		{
 			heap_close(rel, RowExclusiveLock);
 			*sublsn = InvalidXLogRecPtr;
-			return '\0';
+			return SUBREL_STATE_UNKNOWN;
 		}
 
 		ereport(ERROR,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3e724de5f1..06d5509fd3 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -296,15 +296,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	bgw.bgw_main = ApplyWorkerMain;
-	snprintf(bgw.bgw_name, BGW_MAXLEN,
-			 "logical replication worker for subscription %u", subid);
-
 	if (OidIsValid(relid))
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication worker %u sync %u", subid, relid);
+				 "logical replication worker for subscription %u sync %u", subid, relid);
 	else
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication worker %u", subid);
+				 "logical replication worker for subscription %u", subid);
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
 	bgw.bgw_notify_pid = MyProcPid;
@@ -434,7 +431,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid)
 	LWLockRelease(LogicalRepWorkerLock);
 
 	if (worker)
-		SetLatch(&worker->proc->procLatch);
+		logicalrep_worker_wakeup_ptr(worker);
 }
 
 /*
@@ -443,8 +440,7 @@ logicalrep_worker_wakeup(Oid subid, Oid relid)
 void
 logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
 {
-	if (worker)
-		SetLatch(&worker->proc->procLatch);
+	SetLatch(&worker->proc->procLatch);
 }
 
 /*
@@ -817,10 +813,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		MemSet(nulls, 0, sizeof(nulls));
 
 		values[0] = ObjectIdGetDatum(worker.subid);
-		if (!OidIsValid(worker.relid))
-			nulls[1] = true;
-		else
+		if (OidIsValid(worker.relid))
 			values[1] = ObjectIdGetDatum(worker.relid);
+		else
+			nulls[1] = true;
 		values[2] = Int32GetDatum(worker_pid);
 		if (XLogRecPtrIsInvalid(worker.last_lsn))
 			nulls[3] = true;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 3f0abd7ce2..de90777cf9 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -515,14 +515,14 @@ SnapBuildInitalSnapshot(SnapBuild *builder)
 	Assert(XactIsoLevel = XACT_REPEATABLE_READ);
 
 	if (builder->state != SNAPBUILD_CONSISTENT)
-		elog(ERROR, "cannot build and initial slot snapshot before reaching a consistent state");
+		elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
 
 	if (!builder->committed.includes_all_transactions)
-		elog(ERROR, "cannot build and initial slot snapshot, not all transactions are monitored anymore");
+		elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
 
 	/* so we don't overwrite the existing value */
 	if (TransactionIdIsValid(MyPgXact->xmin))
-		elog(ERROR, "cannot build and initial slot snapshot when MyPgXact->xmin already is valid");
+		elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
 
 	snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5383364011..e7fda70b75 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -109,8 +109,8 @@ WalReceiverConn	   *wrconn = NULL;
 Subscription	   *MySubscription = NULL;
 bool				MySubscriptionValid = false;
 
-static char	   *myslotname = NULL;
-bool			in_remote_transaction = false;
+static char		   *myslotname = NULL;
+bool				in_remote_transaction = false;
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0755b88f5a..5990be52db 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -27,6 +27,7 @@ Node *replication_parse_result;
 
 static SQLCmd *make_sqlcmd(void);
 
+
 /*
  * Bison doesn't allocate anything that needs to live across parser calls,
  * so we can easily have it use palloc instead of malloc.  This prevents
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 02e1652f11..b4c7f73cf5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1380,7 +1380,7 @@ exec_replication_command(const char *cmd_string)
 	 * For aborted transactions, don't allow anything except pure SQL,
 	 * the exec_simple_query() will handle it correctly.
 	 */
-	if (IsAbortedTransactionBlockState() && cmd_node->type != T_SQLCmd)
+	if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
 		ereport(ERROR,
 				(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
 				 errmsg("current transaction is aborted, "
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index b54ad50aae..ba41f90712 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4053,7 +4053,6 @@ PostgresMain(int argc, char *argv[],
 			case 'Q':			/* simple query */
 				{
 					const char *query_string;
-					bool		walsender_query = false;
 
 					/* Set statement_timestamp() */
 					SetCurrentStatementStartTimestamp();
@@ -4062,8 +4061,11 @@ PostgresMain(int argc, char *argv[],
 					pq_getmsgend(&input_message);
 
 					if (am_walsender)
-						walsender_query = exec_replication_command(query_string);
-					if (!walsender_query)
+					{
+						if (!exec_replication_command(query_string))
+							exec_simple_query(query_string);
+					}
+					else
 						exec_simple_query(query_string);
 
 					send_ready_for_query = true;
-- 
2.12.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to