> Hi,
>
> > I'm attaching a new version (v30) that adds:
> >
> > * 3 new options (--publication, --subscription, --replication-slot) to 
> > assign
> >   names to the objects. The --database option used to ignore duplicate 
> > names,
> >   however, since these new options rely on the number of database options to
> >   match the number of object name options, it is forbidden from now on. The
> >   duplication is also forbidden for the object names to avoid errors 
> > earlier.
> > * rewrite the paragraph related to unusuable target server after
> >   pg_createsubscriber fails.
> > * Vignesh reported an issue [1] related to reaching the recovery stop point
> >   before the consistent state is reached. I proposed a simple patch that 
> > fixes
> >   the issue.
> >
> > [1] 
> > https://www.postgresql.org/message-id/CALDaNm3VMOi0GugGvhk3motghaFRKSWMCSE2t3YX1e%2BMttToxg%40mail.gmail.com
> >
>
> I have added a top-up patch v30-0003. The issue in [1] still exists in
> the v30 patch. I was not able to come up with an approach to handle it
> in the code, so I have added it to the documentation in the warning
> section. Thoughts?
> I am not changing the version as I have not made any changes in
> v30-0001 and v30-0002.
>
> [1]: 
> https://www.postgresql.org/message-id/cahv8rj+5mzk9jt+7ecogjzfm5czvdccd5jo1_rcx0bteypb...@mail.gmail.com

There was some whitespace error in the v30-0003 patch. Updated the patch.



Thanks and regards,
Shlok Kyal
From d6717bcaf94302c0d41eabd448802b1fae6167d9 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Mon, 5 Jun 2023 14:39:40 -0400
Subject: [PATCH v31 1/3] pg_createsubscriber: creates a new logical replica
 from a standby server

It must be run on the target server and should be able to connect to the
source server (publisher) and the target server (subscriber). All tables
in the specified database(s) are included in the logical replication
setup. A pair of publication and subscription objects are created for
each database.

The main advantage of pg_createsubscriber over the common logical
replication setup is the initial data copy. It also reduces the catchup
phase.

Some prerequisites must be met to successfully run it. It is basically
the logical replication requirements. It starts creating a publication
using FOR ALL TABLES and a replication slot for each specified database.
Write recovery parameters into the target data directory and start the
target server. It specifies the LSN of the last replication slot
(replication start point) up to which the recovery will proceed. Wait
until the target server is promoted. Create one subscription per
specified database (using publication and replication slot created in a
previous step) on the target server. Set the replication progress to the
replication start point for each subscription. Enable the subscription
for each specified database on the target server. And finally, change
the system identifier on the target server.
---
 doc/src/sgml/ref/allfiles.sgml                |    1 +
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  525 ++++
 doc/src/sgml/reference.sgml                   |    1 +
 src/bin/pg_basebackup/.gitignore              |    1 +
 src/bin/pg_basebackup/Makefile                |   11 +-
 src/bin/pg_basebackup/meson.build             |   19 +
 src/bin/pg_basebackup/pg_createsubscriber.c   | 2131 +++++++++++++++++
 .../t/040_pg_createsubscriber.pl              |  326 +++
 8 files changed, 3012 insertions(+), 3 deletions(-)
 create mode 100644 doc/src/sgml/ref/pg_createsubscriber.sgml
 create mode 100644 src/bin/pg_basebackup/pg_createsubscriber.c
 create mode 100644 src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 4a42999b18..f5be638867 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -205,6 +205,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pgCombinebackup    SYSTEM "pg_combinebackup.sgml">
 <!ENTITY pgConfig           SYSTEM "pg_config-ref.sgml">
 <!ENTITY pgControldata      SYSTEM "pg_controldata.sgml">
+<!ENTITY pgCreateSubscriber SYSTEM "pg_createsubscriber.sgml">
 <!ENTITY pgCtl              SYSTEM "pg_ctl-ref.sgml">
 <!ENTITY pgDump             SYSTEM "pg_dump.sgml">
 <!ENTITY pgDumpall          SYSTEM "pg_dumpall.sgml">
diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
new file mode 100644
index 0000000000..642213b5a4
--- /dev/null
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -0,0 +1,525 @@
+<!--
+doc/src/sgml/ref/pg_createsubscriber.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgcreatesubscriber">
+ <indexterm zone="app-pgcreatesubscriber">
+  <primary>pg_createsubscriber</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle><application>pg_createsubscriber</application></refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_createsubscriber</refname>
+  <refpurpose>convert a physical replica into a new logical replica</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_createsubscriber</command>
+   <arg rep="repeat"><replaceable>option</replaceable></arg>
+   <group choice="plain">
+    <group choice="req">
+     <arg choice="plain"><option>-d</option></arg>
+     <arg choice="plain"><option>--database</option></arg>
+    </group>
+    <replaceable>dbname</replaceable>
+    <group choice="req">
+     <arg choice="plain"><option>-D</option> </arg>
+     <arg choice="plain"><option>--pgdata</option></arg>
+    </group>
+    <replaceable>datadir</replaceable>
+    <group choice="req">
+     <arg choice="plain"><option>-P</option></arg>
+     <arg choice="plain"><option>--publisher-server</option></arg>
+    </group>
+    <replaceable>connstr</replaceable>
+   </group>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+  <para>
+    <application>pg_createsubscriber</application> creates a new logical
+    replica from a physical standby server. All tables in the specified
+    database are included in the logical replication setup. A pair of
+    publication and subscription objects are created for each database. It
+    must be run at the target server.
+  </para>
+
+  <para>
+    After a successful run, the state of the target server is analagous to a
+    fresh logical replication setup. The main difference between the logical
+    replication setup and <application>pg_createsubscriber</application>
+    is the initial data copy. Only the synchronization phase is done, which
+    ensures each table is brought up to a synchronized state.
+  </para>
+
+  <para>
+   The <application>pg_createsubscriber</application> targets large database
+   systems because most of the execution time is spent making the initial data
+   copy. For smaller databases,
+   <link linkend="logical-replication">initial data synchronization</link> is
+   recommended.
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    <application>pg_createsubscriber</application> accepts the following
+    command-line arguments:
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
+      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
+      <listitem>
+       <para>
+        The database name to create the subscription. Multiple databases can be
+        selected by writing multiple <option>-d</option> switches.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+      <term><option>--pgdata=<replaceable class="parameter">directory</replaceable></option></term>
+      <listitem>
+       <para>
+        The target directory that contains a cluster directory from a physical
+        replica.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-n</option></term>
+      <term><option>--dry-run</option></term>
+      <listitem>
+       <para>
+        Do everything except actually modifying the target directory.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-p <replaceable class="parameter">port</replaceable></option></term>
+      <term><option>--subscriber-port=<replaceable class="parameter">port</replaceable></option></term>
+      <listitem>
+       <para>
+        The port number on which the target server is listening for connections.
+        Defaults to running the target server on port 50432 to avoid unintended
+        client connections.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-P <replaceable class="parameter">connstr</replaceable></option></term>
+      <term><option>--publisher-server=<replaceable class="parameter">connstr</replaceable></option></term>
+      <listitem>
+       <para>
+        The connection string to the publisher. For details see <xref linkend="libpq-connstring"/>.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-s <replaceable class="parameter">dir</replaceable></option></term>
+      <term><option>--socket-directory=<replaceable class="parameter">dir</replaceable></option></term>
+      <listitem>
+       <para>
+        The directory to use for postmaster sockets on target server. The
+        default is current directory.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-t <replaceable class="parameter">seconds</replaceable></option></term>
+       <term><option>--recovery-timeout=<replaceable class="parameter">seconds</replaceable></option></term>
+       <listitem>
+       <para>
+        The maximum number of seconds to wait for recovery to end. Setting to 0
+        disables. The default is 0.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-U <replaceable class="parameter">username</replaceable></option></term>
+      <term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
+      <listitem>
+       <para>
+        The username to connect as on target server. Defaults to the current
+        operating system user name.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-v</option></term>
+      <term><option>--verbose</option></term>
+      <listitem>
+       <para>
+        Enables verbose mode. This will cause
+        <application>pg_createsubscriber</application> to output progress messages
+        and detailed information about each step to standard error.
+        Repeating the option causes additional debug-level messages to appear on
+        standard error.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--config-file=<replaceable class="parameter">filename</replaceable></option></term>
+      <listitem>
+       <para>
+        Use the specified main server configuration file for the target
+        data directory. The <application>pg_createsubscriber</application>
+        uses internally the <application>pg_ctl</application> command to start
+        and stop the target server. It allows you to specify the actual
+        <filename>postgresql.conf</filename> configuration file if it is stored
+        outside the data directory.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--publication=<replaceable class="parameter">name</replaceable></option></term>
+      <listitem>
+       <para>
+        The publication name to set up the logical replication. Multiple
+        publications can be specified by writing multiple
+        <option>--publication</option> switches. The number of publication
+        names must match the number of specified databases, otherwise an error
+        is reported. The order of the multiple publication name switches must
+        match the order of database switches. If this option is not specified,
+        a generated name is assigned to the publication name.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--subscription=<replaceable class="parameter">name</replaceable></option></term>
+      <listitem>
+       <para>
+        The subscription name to set up the logical replication. Multiple
+        subscriptions can be specified by writing multiple
+        <option>--subscription</option> switches. The number of subscription
+        names must match the number of specified databases, otherwise an error
+        is reported. The order of the multiple subscription name switches must
+        match the order of database switches. If this option is not specified,
+        a generated name is assigned to the subscription name.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--replication-slot=<replaceable class="parameter">name</replaceable></option></term>
+      <listitem>
+       <para>
+        The replication slot name to set up the logical replication. Multiple
+        replication slots can be specified by writing multiple
+        <option>--replication-slot</option> switches. The number of replication
+        slot names must match the number of specified databases, otherwise an
+        error is reported. The order of the multiple replication slot name
+        switches must match the order of database switches. If this option is
+        not specified, a generated name is assigned to the replication slot
+        name.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-?</option></term>
+       <term><option>--help</option></term>
+       <listitem>
+       <para>
+       Show help about <application>pg_createsubscriber</application> command
+       line arguments, and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-V</option></term>
+       <term><option>--version</option></term>
+       <listitem>
+       <para>
+       Print the <application>pg_createsubscriber</application> version and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+
+  <para>
+   There are some prerequisites for
+   <application>pg_createsubscriber</application> to convert the target server
+   into a logical replica. If these are not met an error will be reported. The
+   source and target servers must have the same major version as the
+   <application>pg_createsubscriber</application>. The given target data
+   directory must have the same system identifier than the source data
+   directory. If a standby server is running on the target data directory or it
+   is a base backup from the source data directory, system identifiers are the
+   same. The given database user for the target data directory must have
+   privileges for creating
+   <link linkend="sql-createsubscription">subscriptions</link> and using
+   <link linkend="pg-replication-origin-advance"><function>pg_replication_origin_advance()</function></link>.
+  </para>
+
+  <para>
+   The target server must be used as a physical standby. The target server
+   must have
+   <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+   and
+   <link linkend="guc-max-logical-replication-workers"><varname>max_logical_replication_workers</varname></link>
+   configured to a value greater than or equal to the number of specified
+   databases. The target server must have
+   <link linkend="guc-max-worker-processes"><varname>max_worker_processes</varname></link>
+   configured to a value greater than the number of specified databases. The
+   target server must accept local connections.
+  </para>
+
+  <para>
+   The source server must accept connections from the target server. The
+   source server must not be in recovery. Publications cannot be created in a
+   read-only cluster. The source server must have
+   <link linkend="guc-wal-level"><varname>wal_level</varname></link> as
+   <literal>logical</literal>.
+   The source server must have
+   <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+   configured to a value greater than or equal to the number of specified
+   databases plus existing replication slots. The source server must have
+   <link linkend="guc-max-wal-senders"><varname>max_wal_senders</varname></link>
+   configured to a value greater than or equal to the number of specified
+   databases and existing <literal>walsender</literal> processes.
+  </para>
+
+  <warning>
+   <title>Warning</title>
+   <para>
+    If <application>pg_createsubscriber</application> fails after the target
+    server was promoted, then the data directory is likely not in a state that
+    can be recovered. In such case, creating a new standby server is
+    recommended.
+   </para>
+
+   <para>
+    <application>pg_createsubscriber</application> usually starts the target
+    server with different connection settings during the transformation steps.
+    Hence, regular connections to the target server should fail.
+   </para>
+
+   <para>
+    During the recovery process, if the target server disconnects from the
+    source server, <application>pg_createsubscriber</application> will check
+    a few times if the connection has been reestablished to stream the required
+    WAL. After a few attempts, it terminates with an error.
+   </para>
+
+   <para>
+    Since DDL commands are not replicated by logical replication, avoid
+    executing DDL commands that change the database schema while running
+    <application>pg_createsubscriber</application>. If the target server has
+    already been converted to logical replica, the DDL commands must not be
+    replicated which might cause an error.
+   </para>
+
+   <para>
+    If <application>pg_createsubscriber</application> fails while processing,
+    objects (publications, replication slots) created on the source server
+    are removed. The removal might fail if the target server cannot connect to
+    the source server. In such a case, a warning message will inform the
+    objects left. If the target server is running, it will be stopped.
+   </para>
+
+   <para>
+    If the replication is using a
+    <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>,
+    it will be removed from the source server after the logical replication setup.
+   </para>
+
+   <para>
+    If the target server is a synchronous replica, transaction commits on the
+    primary might wait for replication while running
+    <application>pg_createsubscriber</application>.
+   </para>
+
+   <para>
+    <application>pg_createsubscriber</application> changes the system identifier
+    using <application>pg_resetwal</application>. It would avoid situations in
+    which WAL files from the source server might be used by the target server.
+    If the target server has a standby, replication will break and a fresh
+    standby should be created.
+   </para>
+  </warning>
+
+ </refsect1>
+
+ <refsect1>
+  <title>How It Works</title>
+
+  <para>
+    The basic idea is to have a replication start point from the source server
+    and set up a logical replication to start from this point:
+  </para>
+
+  <procedure>
+   <step>
+    <para>
+     Start the target server with the specified command-line options. If the
+     target server is already running, stop it because some parameters can only
+     be set at server start.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Check if the target server can be converted. There are also a few checks
+     on the source server. If any of the prerequisites are not met,
+     <application>pg_createsubscriber</application> will terminate with an
+     error.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Create a publication and replication slot for each specified database on
+     the source server. Each publication is created using
+     <link linkend="sql-createpublication-params-for-all-tables"><literal>FOR ALL TABLES</literal></link>.
+     If <option>publication-name</option> option is not specified, it has the
+     following name pattern:
+     <quote><literal>pg_createsubscriber_%u_%x</literal></quote> (parameter:
+     database <parameter>oid</parameter>, random <parameter>int</parameter>).
+     If <option>replication-slot-name</option> is not specified, the
+     replication slot has the following name pattern:
+     <quote><literal>pg_createsubscriber_%u_%x</literal></quote> (parameters:
+     database <parameter>oid</parameter>, random <parameter>int</parameter>).
+     These replication slots will be used by the subscriptions in a future step.
+     The last replication slot LSN is used as a stopping point in the
+     <xref linkend="guc-recovery-target-lsn"/> parameter and by the
+     subscriptions as a replication start point. It guarantees that no
+     transaction will be lost.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Write recovery parameters into the target data directory and restart the
+     target server. It specifies a LSN (<xref linkend="guc-recovery-target-lsn"/>)
+     of write-ahead log location up to which recovery will proceed. It also
+     specifies <literal>promote</literal> as the action that the server should
+     take once the recovery target is reached. Additional
+     <link linkend="runtime-config-wal-recovery-target">recovery parameters</link>
+     are also added so it avoids unexpected behavior during the recovery
+     process such as end of the recovery as soon as a consistent state is
+     reached (WAL should be applied until the replication start location) and
+     multiple recovery targets that can cause a failure. This step finishes
+     once the server ends standby mode and is accepting read-write transactions.
+     If <option>--recovery-timeout</option> option is set,
+     <application>pg_createsubscriber</application> terminates if recovery does
+     not end until the given number of seconds.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Create a subscription for each specified database on the target server.
+     If <option>subscription-name</option> is not specified, the subscription
+     has the following name pattern:
+     <quote><literal>pg_createsubscriber_%u_%x</literal></quote> (parameters:
+     database <parameter>oid</parameter>, random <parameter>int</parameter>).
+     It does not copy existing data from the source server. It does not create
+     a replication slot. Instead, it uses the replication slot that was created
+     in a previous step. The subscription is created but it is not enabled yet.
+     The reason is the replication progress must be set to the replication
+     start point before starting the replication.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Drop publications on the target server that were replicated because they
+     were created before the replication start location. It has no use on the
+     subscriber.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Set the replication progress to the replication start point for each
+     subscription. When the target server starts the recovery process, it
+     catches up to the replication start point. This is the exact LSN to be used
+     as a initial replication location for each subscription. The replication
+     origin name is obtained since the subscription was created. The replication
+     origin name and the replication start point are used in
+     <link linkend="pg-replication-origin-advance"><function>pg_replication_origin_advance()</function></link>
+     to set up the initial replication location.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Enable the subscription for each specified database on the target server.
+     The subscription starts applying transactions from the replication start
+     point.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     If the standby server was using
+     <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>,
+     it has no use from now on so drop it.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Update the system identifier on the target server. The
+     <xref linkend="app-pgresetwal"/> is run to modify the system identifier.
+     The target server is stopped as a <command>pg_resetwal</command> requirement.
+    </para>
+   </step>
+  </procedure>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   To create a logical replica for databases <literal>hr</literal> and
+   <literal>finance</literal> from a physical replica at <literal>foo</literal>:
+<screen>
+<prompt>$</prompt> <userinput>pg_createsubscriber -D /usr/local/pgsql/data -P "host=foo" -d hr -d finance</userinput>
+</screen>
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>See Also</title>
+
+  <simplelist type="inline">
+   <member><xref linkend="app-pgbasebackup"/></member>
+  </simplelist>
+ </refsect1>
+
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index aa94f6adf6..ff85ace83f 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -282,6 +282,7 @@
    &pgarchivecleanup;
    &pgChecksums;
    &pgControldata;
+   &pgCreateSubscriber;
    &pgCtl;
    &pgResetwal;
    &pgRewind;
diff --git a/src/bin/pg_basebackup/.gitignore b/src/bin/pg_basebackup/.gitignore
index 26048bdbd8..14d5de6c01 100644
--- a/src/bin/pg_basebackup/.gitignore
+++ b/src/bin/pg_basebackup/.gitignore
@@ -1,4 +1,5 @@
 /pg_basebackup
+/pg_createsubscriber
 /pg_receivewal
 /pg_recvlogical
 
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index abfb6440ec..e9a920dbcd 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -44,11 +44,14 @@ BBOBJS = \
 	bbstreamer_tar.o \
 	bbstreamer_zstd.o
 
-all: pg_basebackup pg_receivewal pg_recvlogical
+all: pg_basebackup pg_createsubscriber pg_receivewal pg_recvlogical
 
 pg_basebackup: $(BBOBJS) $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
 	$(CC) $(CFLAGS) $(BBOBJS) $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
+pg_createsubscriber: $(WIN32RES) pg_createsubscriber.o | submake-libpq submake-libpgport submake-libpgfeutils
+	$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
 pg_receivewal: pg_receivewal.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
 	$(CC) $(CFLAGS) pg_receivewal.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
@@ -57,6 +60,7 @@ pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport subma
 
 install: all installdirs
 	$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+	$(INSTALL_PROGRAM) pg_createsubscriber$(X) '$(DESTDIR)$(bindir)/pg_createsubscriber$(X)'
 	$(INSTALL_PROGRAM) pg_receivewal$(X) '$(DESTDIR)$(bindir)/pg_receivewal$(X)'
 	$(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
 
@@ -65,12 +69,13 @@ installdirs:
 
 uninstall:
 	rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+	rm -f '$(DESTDIR)$(bindir)/pg_createsubscriber$(X)'
 	rm -f '$(DESTDIR)$(bindir)/pg_receivewal$(X)'
 	rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
 
 clean distclean:
-	rm -f pg_basebackup$(X) pg_receivewal$(X) pg_recvlogical$(X) \
-		$(BBOBJS) pg_receivewal.o pg_recvlogical.o \
+	rm -f pg_basebackup$(X) pg_createsubscriber$(X) pg_receivewal$(X) pg_recvlogical$(X) \
+		$(BBOBJS) pg_createsubscriber.o pg_receivewal.o pg_recvlogical.o \
 		$(OBJS)
 	rm -rf tmp_check
 
diff --git a/src/bin/pg_basebackup/meson.build b/src/bin/pg_basebackup/meson.build
index f7e60e6670..c00acd5e11 100644
--- a/src/bin/pg_basebackup/meson.build
+++ b/src/bin/pg_basebackup/meson.build
@@ -38,6 +38,24 @@ pg_basebackup = executable('pg_basebackup',
 bin_targets += pg_basebackup
 
 
+pg_createsubscriber_sources = files(
+  'pg_createsubscriber.c'
+)
+
+if host_system == 'windows'
+  pg_createsubscriber_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+	'--NAME', 'pg_createsubscriber',
+	'--FILEDESC', 'pg_createsubscriber - create a new logical replica from a standby server',])
+endif
+
+pg_createsubscriber = executable('pg_createsubscriber',
+  pg_createsubscriber_sources,
+  dependencies: [frontend_code, libpq],
+  kwargs: default_bin_args,
+)
+bin_targets += pg_createsubscriber
+
+
 pg_receivewal_sources = files(
   'pg_receivewal.c',
 )
@@ -89,6 +107,7 @@ tests += {
       't/011_in_place_tablespace.pl',
       't/020_pg_receivewal.pl',
       't/030_pg_recvlogical.pl',
+      't/040_pg_createsubscriber.pl',
     ],
   },
 }
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
new file mode 100644
index 0000000000..6cc1c34121
--- /dev/null
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -0,0 +1,2131 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_createsubscriber.c
+ *	  Create a new logical replica from a standby server
+ *
+ * Copyright (C) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/bin/pg_basebackup/pg_createsubscriber.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <time.h>
+
+#include "catalog/pg_authid_d.h"
+#include "common/connect.h"
+#include "common/controldata_utils.h"
+#include "common/file_perm.h"
+#include "common/logging.h"
+#include "common/pg_prng.h"
+#include "common/restricted_token.h"
+#include "fe_utils/recovery_gen.h"
+#include "fe_utils/simple_list.h"
+#include "getopt_long.h"
+
+#define	DEFAULT_SUB_PORT	"50432"
+
+/* Command-line options */
+struct CreateSubscriberOptions
+{
+	char	   *config_file;	/* configuration file */
+	char	   *pub_conninfo_str;	/* publisher connection string */
+	char	   *socket_dir;		/* directory for Unix-domain socket, if any */
+	char	   *sub_port;		/* subscriber port number */
+	const char *sub_username;	/* subscriber username */
+	SimpleStringList database_names;	/* list of database names */
+	SimpleStringList pub_names; /* list of publication names */
+	SimpleStringList sub_names; /* list of subscription names */
+	SimpleStringList replslot_names;	/* list of replication slot names */
+	int			recovery_timeout;	/* stop recovery after this time */
+}			CreateSubscriberOptions;
+
+struct LogicalRepInfo
+{
+	Oid			oid;			/* database OID */
+	char	   *dbname;			/* database name */
+	char	   *pubconninfo;	/* publisher connection string */
+	char	   *subconninfo;	/* subscriber connection string */
+	char	   *pubname;		/* publication name */
+	char	   *subname;		/* subscription name */
+	char	   *replslotname;	/* replication slot name */
+
+	bool		made_replslot;	/* replication slot was created */
+	bool		made_publication;	/* publication was created */
+}			LogicalRepInfo;
+
+static void cleanup_objects_atexit(void);
+static void usage();
+static char *get_base_conninfo(char *conninfo, char **dbname);
+static char *get_sub_conninfo(struct CreateSubscriberOptions *opt);
+static char *get_exec_path(const char *argv0, const char *progname);
+static void check_data_directory(const char *datadir);
+static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
+static struct LogicalRepInfo *store_pub_sub_info(struct CreateSubscriberOptions *opt,
+												 const char *pub_base_conninfo,
+												 const char *sub_base_conninfo);
+static PGconn *connect_database(const char *conninfo, bool exit_on_error);
+static void disconnect_database(PGconn *conn, bool exit_on_error);
+static uint64 get_primary_sysid(const char *conninfo);
+static uint64 get_standby_sysid(const char *datadir);
+static void modify_subscriber_sysid(struct CreateSubscriberOptions *opt);
+static bool server_is_in_recovery(PGconn *conn);
+static char *generate_object_name(PGconn *conn);
+static void check_publisher(struct LogicalRepInfo *dbinfo);
+static char *setup_publisher(struct LogicalRepInfo *dbinfo);
+static void check_subscriber(struct LogicalRepInfo *dbinfo);
+static void setup_subscriber(struct LogicalRepInfo *dbinfo,
+							 const char *consistent_lsn);
+static void setup_recovery(struct LogicalRepInfo *dbinfo, char *datadir,
+						   char *lsn);
+static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
+										  char *slotname);
+static char *create_logical_replication_slot(PGconn *conn,
+											 struct LogicalRepInfo *dbinfo);
+static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
+								  const char *slot_name);
+static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
+static void start_standby_server(struct CreateSubscriberOptions *opt,
+								 bool restricted_access);
+static void stop_standby_server(const char *datadir);
+static void wait_for_end_recovery(const char *conninfo,
+								  struct CreateSubscriberOptions *opt);
+static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void create_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void set_replication_progress(PGconn *conn, struct LogicalRepInfo *dbinfo,
+									 const char *lsn);
+static void enable_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo);
+
+#define	USEC_PER_SEC	1000000
+#define	WAIT_INTERVAL	1		/* 1 second */
+
+static const char *progname;
+
+static char *primary_slot_name = NULL;
+static bool dry_run = false;
+
+static bool success = false;
+
+static struct LogicalRepInfo *dbinfo;
+static int	num_dbs = 0;
+static int	num_pubs = 0;
+static int	num_subs = 0;
+static int	num_replslots = 0;
+
+static char *pg_ctl_path = NULL;
+static char *pg_resetwal_path = NULL;
+
+/* standby / subscriber data directory */
+static char *subscriber_dir = NULL;
+
+static bool recovery_ended = false;
+static bool standby_running = false;
+
+enum WaitPMResult
+{
+	POSTMASTER_READY,
+	POSTMASTER_STILL_STARTING
+};
+
+
+/*
+ * Cleanup objects that were created by pg_createsubscriber if there is an
+ * error.
+ *
+ * Replication slots, publications and subscriptions are created. Depending on
+ * the step it failed, it should remove the already created objects if it is
+ * possible (sometimes it won't work due to a connection issue).
+ */
+static void
+cleanup_objects_atexit(void)
+{
+	if (success)
+		return;
+
+	/*
+	 * If the server is promoted, there is no way to use the current setup
+	 * again. Warn the user that a new replication setup should be done before
+	 * trying again.
+	 */
+	if (recovery_ended)
+	{
+		pg_log_warning("pg_createsubscriber failed after the end of recovery");
+		pg_log_warning_hint("The target server cannot be used as a physical replica anymore.");
+		pg_log_warning_hint("You must recreate the physical replica before continuing.");
+	}
+
+	for (int i = 0; i < num_dbs; i++)
+	{
+		if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
+		{
+			PGconn	   *conn;
+
+			conn = connect_database(dbinfo[i].pubconninfo, false);
+			if (conn != NULL)
+			{
+				if (dbinfo[i].made_publication)
+					drop_publication(conn, &dbinfo[i]);
+				if (dbinfo[i].made_replslot)
+					drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
+				disconnect_database(conn, false);
+			}
+			else
+			{
+				/*
+				 * If a connection could not be established, inform the user
+				 * that some objects were left on primary and should be
+				 * removed before trying again.
+				 */
+				if (dbinfo[i].made_publication)
+				{
+					pg_log_warning("There might be a publication \"%s\" in database \"%s\" on primary",
+								   dbinfo[i].pubname, dbinfo[i].dbname);
+					pg_log_warning_hint("Consider dropping this publication before trying again.");
+				}
+				if (dbinfo[i].made_replslot)
+				{
+					pg_log_warning("There might be a replication slot \"%s\" in database \"%s\" on primary",
+								   dbinfo[i].replslotname, dbinfo[i].dbname);
+					pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+				}
+			}
+		}
+	}
+
+	if (standby_running)
+		stop_standby_server(subscriber_dir);
+}
+
+static void
+usage(void)
+{
+	printf(_("%s creates a new logical replica from a standby server.\n\n"),
+		   progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]...\n"), progname);
+	printf(_("\nOptions:\n"));
+	printf(_(" -d, --database=DBNAME               database to create a subscription\n"));
+	printf(_(" -D, --pgdata=DATADIR                location for the subscriber data directory\n"));
+	printf(_(" -n, --dry-run                       dry run, just show what would be done\n"));
+	printf(_(" -p, --subscriber-port=PORT          subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
+	printf(_(" -P, --publisher-server=CONNSTR      publisher connection string\n"));
+	printf(_(" -s, --socket-directory=DIR          socket directory to use (default current directory)\n"));
+	printf(_(" -t, --recovery-timeout=SECS         seconds to wait for recovery to end\n"));
+	printf(_(" -U, --subscriber-username=NAME      subscriber username\n"));
+	printf(_(" -v, --verbose                       output verbose messages\n"));
+	printf(_("     --config-file=FILENAME          use specified main server configuration\n"
+			 "                                     file when running target cluster\n"));
+	printf(_("     --publication=NAME              publication name\n"));
+	printf(_("     --replication-slot=NAME         replication slot name\n"));
+	printf(_("     --subscription=NAME             subscription name\n"));
+	printf(_(" -V, --version                       output version information, then exit\n"));
+	printf(_(" -?, --help                          show this help, then exit\n"));
+	printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
+	printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
+}
+
+/*
+ * Validate a connection string. Returns a base connection string that is a
+ * connection string without a database name.
+ *
+ * Since we might process multiple databases, each database name will be
+ * appended to this base connection string to provide a final connection
+ * string. If the second argument (dbname) is not null, returns dbname if the
+ * provided connection string contains it. If option --database is not
+ * provided, uses dbname as the only database to setup the logical replica.
+ *
+ * It is the caller's responsibility to free the returned connection string and
+ * dbname.
+ */
+static char *
+get_base_conninfo(char *conninfo, char **dbname)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	PQconninfoOption *conn_opts = NULL;
+	PQconninfoOption *conn_opt;
+	char	   *errmsg = NULL;
+	char	   *ret;
+	int			i;
+
+	conn_opts = PQconninfoParse(conninfo, &errmsg);
+	if (conn_opts == NULL)
+	{
+		pg_log_error("could not parse connection string: %s", errmsg);
+		return NULL;
+	}
+
+	i = 0;
+	for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
+	{
+		if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL)
+		{
+			if (dbname)
+				*dbname = pg_strdup(conn_opt->val);
+			continue;
+		}
+
+		if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+		{
+			if (i > 0)
+				appendPQExpBufferChar(buf, ' ');
+			appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val);
+			i++;
+		}
+	}
+
+	ret = pg_strdup(buf->data);
+
+	destroyPQExpBuffer(buf);
+	PQconninfoFree(conn_opts);
+
+	return ret;
+}
+
+/*
+ * Build a subscriber connection string. Only a few parameters are supported
+ * since it starts a server with restricted access.
+ */
+static char *
+get_sub_conninfo(struct CreateSubscriberOptions *opt)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	char	   *ret;
+
+	appendPQExpBuffer(buf, "port=%s", opt->sub_port);
+#if !defined(WIN32)
+	appendPQExpBuffer(buf, " host=%s", opt->socket_dir);
+#endif
+	if (opt->sub_username != NULL)
+		appendPQExpBuffer(buf, " user=%s", opt->sub_username);
+	appendPQExpBuffer(buf, " fallback_application_name=%s", progname);
+
+	ret = pg_strdup(buf->data);
+
+	destroyPQExpBuffer(buf);
+
+	return ret;
+}
+
+/*
+ * Verify if a PostgreSQL binary (progname) is available in the same directory as
+ * pg_createsubscriber and it has the same version.  It returns the absolute
+ * path of the progname.
+ */
+static char *
+get_exec_path(const char *argv0, const char *progname)
+{
+	char	   *versionstr;
+	char	   *exec_path;
+	int			ret;
+
+	versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
+	exec_path = pg_malloc(MAXPGPATH);
+	ret = find_other_exec(argv0, progname, versionstr, exec_path);
+
+	if (ret < 0)
+	{
+		char		full_path[MAXPGPATH];
+
+		if (find_my_exec(argv0, full_path) < 0)
+			strlcpy(full_path, progname, sizeof(full_path));
+
+		if (ret == -1)
+			pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
+					 progname, "pg_createsubscriber", full_path);
+		else
+			pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
+					 progname, full_path, "pg_createsubscriber");
+	}
+
+	pg_log_debug("%s path is:  %s", progname, exec_path);
+
+	return exec_path;
+}
+
+/*
+ * Is it a cluster directory? These are preliminary checks. It is far from
+ * making an accurate check. If it is not a clone from the publisher, it will
+ * eventually fail in a future step.
+ */
+static void
+check_data_directory(const char *datadir)
+{
+	struct stat statbuf;
+	char		versionfile[MAXPGPATH];
+
+	pg_log_info("checking if directory \"%s\" is a cluster data directory",
+				datadir);
+
+	if (stat(datadir, &statbuf) != 0)
+	{
+		if (errno == ENOENT)
+			pg_fatal("data directory \"%s\" does not exist", datadir);
+		else
+			pg_fatal("could not access directory \"%s\": %s", datadir,
+					 strerror(errno));
+	}
+
+	snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
+	if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
+	{
+		pg_fatal("directory \"%s\" is not a database cluster directory",
+				 datadir);
+	}
+}
+
+/*
+ * Append database name into a base connection string.
+ *
+ * dbname is the only parameter that changes so it is not included in the base
+ * connection string. This function concatenates dbname to build a "real"
+ * connection string.
+ */
+static char *
+concat_conninfo_dbname(const char *conninfo, const char *dbname)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	char	   *ret;
+
+	Assert(conninfo != NULL);
+
+	appendPQExpBufferStr(buf, conninfo);
+	appendPQExpBuffer(buf, " dbname=%s", dbname);
+
+	ret = pg_strdup(buf->data);
+	destroyPQExpBuffer(buf);
+
+	return ret;
+}
+
+/*
+ * Store publication and subscription information.
+ *
+ * If publication, replication slot and subscription names were specified,
+ * store it here. Otherwise, a generated name will be assigned to the object in
+ * setup_publisher().
+ */
+static struct LogicalRepInfo *
+store_pub_sub_info(struct CreateSubscriberOptions *opt, const char *pub_base_conninfo,
+				   const char *sub_base_conninfo)
+{
+	struct LogicalRepInfo *dbinfo;
+	SimpleStringListCell *pubcell = NULL;
+	SimpleStringListCell *subcell = NULL;
+	SimpleStringListCell *replslotcell = NULL;
+	int			i = 0;
+
+	dbinfo = (struct LogicalRepInfo *) pg_malloc(num_dbs * sizeof(struct LogicalRepInfo));
+
+	if (num_pubs > 0)
+		pubcell = opt->pub_names.head;
+	if (num_subs > 0)
+		subcell = opt->sub_names.head;
+	if (num_replslots > 0)
+		replslotcell = opt->replslot_names.head;
+
+	for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
+	{
+		char	   *conninfo;
+
+		/* Fill publisher attributes */
+		conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
+		dbinfo[i].pubconninfo = conninfo;
+		dbinfo[i].dbname = cell->val;
+		if (num_pubs > 0)
+			dbinfo[i].pubname = pubcell->val;
+		else
+			dbinfo[i].pubname = NULL;
+		if (num_replslots > 0)
+			dbinfo[i].replslotname = replslotcell->val;
+		else
+			dbinfo[i].replslotname = NULL;
+		dbinfo[i].made_replslot = false;
+		dbinfo[i].made_publication = false;
+		/* Fill subscriber attributes */
+		conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
+		dbinfo[i].subconninfo = conninfo;
+		if (num_subs > 0)
+			dbinfo[i].subname = subcell->val;
+		else
+			dbinfo[i].subname = NULL;
+		/* Other fields will be filled later */
+
+		pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
+					 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
+					 dbinfo[i].pubconninfo);
+		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
+					 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
+					 dbinfo[i].subconninfo);
+
+		if (num_pubs > 0)
+			pubcell = pubcell->next;
+		if (num_subs > 0)
+			subcell = subcell->next;
+		if (num_replslots > 0)
+			replslotcell = replslotcell->next;
+
+		i++;
+	}
+
+	return dbinfo;
+}
+
+/*
+ * Open a new connection. If exit_on_error is true, it has an undesired
+ * condition and it should exit immediately.
+ */
+static PGconn *
+connect_database(const char *conninfo, bool exit_on_error)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+
+	conn = PQconnectdb(conninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		pg_log_error("connection to database failed: %s",
+					 PQerrorMessage(conn));
+		if (exit_on_error)
+			exit(1);
+
+		return NULL;
+	}
+
+	/* Secure search_path */
+	res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not clear search_path: %s",
+					 PQresultErrorMessage(res));
+		if (exit_on_error)
+			exit(1);
+
+		return NULL;
+	}
+	PQclear(res);
+
+	return conn;
+}
+
+/*
+ * Close the connection. If exit_on_error is true, it has an undesired
+ * condition and it should exit immediately.
+ */
+static void
+disconnect_database(PGconn *conn, bool exit_on_error)
+{
+	Assert(conn != NULL);
+
+	PQfinish(conn);
+
+	if (exit_on_error)
+		exit(1);
+}
+
+/*
+ * Obtain the system identifier using the provided connection. It will be used
+ * to compare if a data directory is a clone of another one.
+ */
+static uint64
+get_primary_sysid(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	uint64		sysid;
+
+	pg_log_info("getting system identifier from publisher");
+
+	conn = connect_database(conninfo, true);
+
+	res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not get system identifier: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+	if (PQntuples(res) != 1)
+	{
+		pg_log_error("could not get system identifier: got %d rows, expected %d row",
+					 PQntuples(res), 1);
+		disconnect_database(conn, true);
+	}
+
+	sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
+
+	pg_log_info("system identifier is %llu on publisher",
+				(unsigned long long) sysid);
+
+	PQclear(res);
+	disconnect_database(conn, false);
+
+	return sysid;
+}
+
+/*
+ * Obtain the system identifier from control file. It will be used to compare
+ * if a data directory is a clone of another one. This routine is used locally
+ * and avoids a connection.
+ */
+static uint64
+get_standby_sysid(const char *datadir)
+{
+	ControlFileData *cf;
+	bool		crc_ok;
+	uint64		sysid;
+
+	pg_log_info("getting system identifier from subscriber");
+
+	cf = get_controlfile(datadir, &crc_ok);
+	if (!crc_ok)
+		pg_fatal("control file appears to be corrupt");
+
+	sysid = cf->system_identifier;
+
+	pg_log_info("system identifier is %llu on subscriber",
+				(unsigned long long) sysid);
+
+	pg_free(cf);
+
+	return sysid;
+}
+
+/*
+ * Modify the system identifier. Since a standby server preserves the system
+ * identifier, it makes sense to change it to avoid situations in which WAL
+ * files from one of the systems might be used in the other one.
+ */
+static void
+modify_subscriber_sysid(struct CreateSubscriberOptions *opt)
+{
+	ControlFileData *cf;
+	bool		crc_ok;
+	struct timeval tv;
+
+	char	   *cmd_str;
+
+	pg_log_info("modifying system identifier from subscriber");
+
+	cf = get_controlfile(subscriber_dir, &crc_ok);
+	if (!crc_ok)
+		pg_fatal("control file appears to be corrupt");
+
+	/*
+	 * Select a new system identifier.
+	 *
+	 * XXX this code was extracted from BootStrapXLOG().
+	 */
+	gettimeofday(&tv, NULL);
+	cf->system_identifier = ((uint64) tv.tv_sec) << 32;
+	cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
+	cf->system_identifier |= getpid() & 0xFFF;
+
+	if (!dry_run)
+		update_controlfile(subscriber_dir, cf, true);
+
+	pg_log_info("system identifier is %llu on subscriber",
+				(unsigned long long) cf->system_identifier);
+
+	pg_log_info("running pg_resetwal on the subscriber");
+
+	cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
+					   subscriber_dir, DEVNULL);
+
+	pg_log_debug("pg_resetwal command is: %s", cmd_str);
+
+	if (!dry_run)
+	{
+		int			rc = system(cmd_str);
+
+		if (rc == 0)
+			pg_log_info("subscriber successfully changed the system identifier");
+		else
+			pg_fatal("subscriber failed to change system identifier: exit code: %d", rc);
+	}
+
+	pg_free(cf);
+}
+
+/*
+ * Generate an object name using a prefix, database oid and a random integer.
+ * It is used in case the user does not specify an object name (publication,
+ * subscription, replication slot).
+ */
+static char *
+generate_object_name(PGconn *conn)
+{
+	PGresult   *res;
+	Oid			oid;
+	uint32		rand;
+	pg_prng_state prng_state;
+	char	   *objname;
+
+	pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
+
+	res = PQexec(conn,
+				 "SELECT oid FROM pg_catalog.pg_database "
+				 "WHERE datname = pg_catalog.current_database()");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain database OID: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+	if (PQntuples(res) != 1)
+	{
+		pg_log_error("could not obtain database OID: got %d rows, expected %d rows",
+					 PQntuples(res), 1);
+		disconnect_database(conn, true);
+	}
+
+	/* Database OID */
+	oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+
+	PQclear(res);
+
+	/* Random unsigned integer */
+	rand = pg_prng_uint32(&prng_state);
+
+	/*
+	 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
+	 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
+	 * '\0').
+	 */
+	objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
+
+	return objname;
+}
+
+/*
+ * Create the publications and replication slots in preparation for logical
+ * replication. Returns the LSN from latest replication slot. It will be the
+ * replication start point that is used to adjust the subscriptions (see
+ * set_replication_progress).
+ */
+static char *
+setup_publisher(struct LogicalRepInfo *dbinfo)
+{
+	char	   *lsn = NULL;
+
+	for (int i = 0; i < num_dbs; i++)
+	{
+		PGconn	   *conn;
+		char	   *genname = NULL;
+
+		conn = connect_database(dbinfo[i].pubconninfo, true);
+
+		/*
+		 * If an object name was not specified as command-line options, assign
+		 * a generated object name.
+		 */
+		if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+			genname = generate_object_name(conn);
+		if (num_pubs == 0)
+			dbinfo[i].pubname = pg_strdup(genname);
+		if (num_subs == 0)
+			dbinfo[i].subname = pg_strdup(genname);
+		if (num_replslots == 0)
+			dbinfo[i].replslotname = pg_strdup(genname);
+
+		/*
+		 * Create publication on publisher. This step should be executed
+		 * *before* promoting the subscriber to avoid any transactions between
+		 * consistent LSN and the new publication rows (such transactions
+		 * wouldn't see the new publication rows resulting in an error).
+		 */
+		create_publication(conn, &dbinfo[i]);
+
+		/* Create replication slot on publisher */
+		if (lsn)
+			pg_free(lsn);
+		lsn = create_logical_replication_slot(conn, &dbinfo[i]);
+		if (lsn != NULL || dry_run)
+			pg_log_info("create replication slot \"%s\" on publisher",
+						dbinfo[i].replslotname);
+		else
+			exit(1);
+
+		disconnect_database(conn, false);
+	}
+
+	return lsn;
+}
+
+/*
+ * Is recovery still in progress?
+ */
+static bool
+server_is_in_recovery(PGconn *conn)
+{
+	PGresult   *res;
+	int			ret;
+
+	res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain recovery progress: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+
+	ret = strcmp("t", PQgetvalue(res, 0, 0));
+
+	PQclear(res);
+
+	return ret == 0;
+}
+
+/*
+ * Is the primary server ready for logical replication?
+ *
+ * XXX Does it not allow a synchronous replica?
+ */
+static void
+check_publisher(struct LogicalRepInfo *dbinfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	bool		failed = false;
+
+	char	   *wal_level;
+	int			max_repslots;
+	int			cur_repslots;
+	int			max_walsenders;
+	int			cur_walsenders;
+
+	pg_log_info("checking settings on publisher");
+
+	conn = connect_database(dbinfo[0].pubconninfo, true);
+
+	/*
+	 * If the primary server is in recovery (i.e. cascading replication),
+	 * objects (publication) cannot be created because it is read only.
+	 */
+	if (server_is_in_recovery(conn))
+	{
+		pg_log_error("primary server cannot be in recovery");
+		disconnect_database(conn, true);
+	}
+
+	/*------------------------------------------------------------------------
+	 * Logical replication requires a few parameters to be set on publisher.
+	 * Since these parameters are not a requirement for physical replication,
+	 * we should check it to make sure it won't fail.
+	 *
+	 * - wal_level = logical
+	 * - max_replication_slots >= current + number of dbs to be converted
+	 * - max_wal_senders >= current + number of dbs to be converted
+	 * -----------------------------------------------------------------------
+	 */
+	res = PQexec(conn,
+				 "WITH wl AS "
+				 "(SELECT setting AS wallevel FROM pg_catalog.pg_settings "
+				 "WHERE name = 'wal_level'), "
+				 "total_mrs AS "
+				 "(SELECT setting AS tmrs FROM pg_catalog.pg_settings "
+				 "WHERE name = 'max_replication_slots'), "
+				 "cur_mrs AS "
+				 "(SELECT count(*) AS cmrs "
+				 "FROM pg_catalog.pg_replication_slots), "
+				 "total_mws AS "
+				 "(SELECT setting AS tmws FROM pg_catalog.pg_settings "
+				 "WHERE name = 'max_wal_senders'), "
+				 "cur_mws AS "
+				 "(SELECT count(*) AS cmws FROM pg_catalog.pg_stat_activity "
+				 "WHERE backend_type = 'walsender') "
+				 "SELECT wallevel, tmrs, cmrs, tmws, cmws "
+				 "FROM wl, total_mrs, cur_mrs, total_mws, cur_mws");
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publisher settings: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+	wal_level = strdup(PQgetvalue(res, 0, 0));
+	max_repslots = atoi(PQgetvalue(res, 0, 1));
+	cur_repslots = atoi(PQgetvalue(res, 0, 2));
+	max_walsenders = atoi(PQgetvalue(res, 0, 3));
+	cur_walsenders = atoi(PQgetvalue(res, 0, 4));
+
+	PQclear(res);
+
+	pg_log_debug("publisher: wal_level: %s", wal_level);
+	pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
+	pg_log_debug("publisher: current replication slots: %d", cur_repslots);
+	pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
+	pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
+
+	/*
+	 * If standby sets primary_slot_name, check if this replication slot is in
+	 * use on primary for WAL retention purposes. This replication slot has no
+	 * use after the transformation, hence, it will be removed at the end of
+	 * this process.
+	 */
+	if (primary_slot_name)
+	{
+		PQExpBuffer str = createPQExpBuffer();
+
+		appendPQExpBuffer(str,
+						  "SELECT 1 FROM pg_catalog.pg_replication_slots "
+						  "WHERE active AND slot_name = '%s'",
+						  primary_slot_name);
+
+		pg_log_debug("command is: %s", str->data);
+
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain replication slot information: %s",
+						 PQresultErrorMessage(res));
+			disconnect_database(conn, true);
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("could not obtain replication slot information: got %d rows, expected %d row",
+						 PQntuples(res), 1);
+			disconnect_database(conn, true);
+		}
+		else
+			pg_log_info("primary has replication slot \"%s\"",
+						primary_slot_name);
+
+		PQclear(res);
+	}
+
+	disconnect_database(conn, false);
+
+	if (strcmp(wal_level, "logical") != 0)
+	{
+		pg_log_error("publisher requires wal_level >= logical");
+		failed = true;
+	}
+
+	if (max_repslots - cur_repslots < num_dbs)
+	{
+		pg_log_error("publisher requires %d replication slots, but only %d remain",
+					 num_dbs, max_repslots - cur_repslots);
+		pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
+						  cur_repslots + num_dbs);
+		failed = true;
+	}
+
+	if (max_walsenders - cur_walsenders < num_dbs)
+	{
+		pg_log_error("publisher requires %d wal sender processes, but only %d remain",
+					 num_dbs, max_walsenders - cur_walsenders);
+		pg_log_error_hint("Consider increasing max_wal_senders to at least %d.",
+						  cur_walsenders + num_dbs);
+		failed = true;
+	}
+
+	if (failed)
+		exit(1);
+}
+
+/*
+ * Is the standby server ready for logical replication?
+ *
+ * XXX Does it not allow a time-delayed replica?
+ *
+ * XXX In a cascaded replication scenario (P -> S -> C), if the target server
+ * is S, it cannot detect there is a replica (server C) because server S starts
+ * accepting only local connections and server C cannot connect to it. Hence,
+ * there is not a reliable way to provide a suitable error saying the server C
+ * will be broken at the end of this process (due to pg_resetwal).
+ */
+static void
+check_subscriber(struct LogicalRepInfo *dbinfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	PQExpBuffer str = createPQExpBuffer();
+	bool		failed = false;
+
+	int			max_lrworkers;
+	int			max_repslots;
+	int			max_wprocs;
+
+	pg_log_info("checking settings on subscriber");
+
+	conn = connect_database(dbinfo[0].subconninfo, true);
+
+	/* The target server must be a standby */
+	if (!server_is_in_recovery(conn))
+	{
+		pg_log_error("The target server must be a standby");
+		disconnect_database(conn, true);
+	}
+
+	/*
+	 * Subscriptions can only be created by roles that have the privileges of
+	 * pg_create_subscription role and CREATE privileges on the specified
+	 * database.
+	 */
+	appendPQExpBuffer(str,
+					  "SELECT pg_catalog.pg_has_role(current_user, %u, 'MEMBER'), "
+					  "pg_catalog.has_database_privilege(current_user, '%s', 'CREATE'), "
+					  "pg_catalog.has_function_privilege(current_user, 'pg_catalog.pg_replication_origin_advance(text, pg_lsn)', 'EXECUTE')",
+					  ROLE_PG_CREATE_SUBSCRIPTION, dbinfo[0].dbname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	res = PQexec(conn, str->data);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain access privilege information: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+	if (strcmp(PQgetvalue(res, 0, 0), "t") != 0)
+	{
+		pg_log_error("permission denied to create subscription");
+		pg_log_error_hint("Only roles with privileges of the \"%s\" role may create subscriptions.",
+						  "pg_create_subscription");
+		failed = true;
+	}
+	if (strcmp(PQgetvalue(res, 0, 1), "t") != 0)
+	{
+		pg_log_error("permission denied for database %s", dbinfo[0].dbname);
+		failed = true;
+	}
+	if (strcmp(PQgetvalue(res, 0, 2), "t") != 0)
+	{
+		pg_log_error("permission denied for function \"%s\"",
+					 "pg_catalog.pg_replication_origin_advance(text, pg_lsn)");
+		failed = true;
+	}
+
+	destroyPQExpBuffer(str);
+	PQclear(res);
+
+	/*------------------------------------------------------------------------
+	 * Logical replication requires a few parameters to be set on subscriber.
+	 * Since these parameters are not a requirement for physical replication,
+	 * we should check it to make sure it won't fail.
+	 *
+	 * - max_replication_slots >= number of dbs to be converted
+	 * - max_logical_replication_workers >= number of dbs to be converted
+	 * - max_worker_processes >= 1 + number of dbs to be converted
+	 *------------------------------------------------------------------------
+	 */
+	res = PQexec(conn,
+				 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
+				 "'max_logical_replication_workers', "
+				 "'max_replication_slots', "
+				 "'max_worker_processes', "
+				 "'primary_slot_name') "
+				 "ORDER BY name");
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain subscriber settings: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+	max_lrworkers = atoi(PQgetvalue(res, 0, 0));
+	max_repslots = atoi(PQgetvalue(res, 1, 0));
+	max_wprocs = atoi(PQgetvalue(res, 2, 0));
+	if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
+		primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
+
+	pg_log_debug("subscriber: max_logical_replication_workers: %d",
+				 max_lrworkers);
+	pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
+	pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
+	if (primary_slot_name)
+		pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
+
+	PQclear(res);
+
+	disconnect_database(conn, false);
+
+	if (max_repslots < num_dbs)
+	{
+		pg_log_error("subscriber requires %d replication slots, but only %d remain",
+					 num_dbs, max_repslots);
+		pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
+						  num_dbs);
+		failed = true;
+	}
+
+	if (max_lrworkers < num_dbs)
+	{
+		pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
+					 num_dbs, max_lrworkers);
+		pg_log_error_hint("Consider increasing max_logical_replication_workers to at least %d.",
+						  num_dbs);
+		failed = true;
+	}
+
+	if (max_wprocs < num_dbs + 1)
+	{
+		pg_log_error("subscriber requires %d worker processes, but only %d remain",
+					 num_dbs + 1, max_wprocs);
+		pg_log_error_hint("Consider increasing max_worker_processes to at least %d.",
+						  num_dbs + 1);
+		failed = true;
+	}
+
+	if (failed)
+		exit(1);
+}
+
+/*
+ * Create the subscriptions, adjust the initial location for logical
+ * replication and enable the subscriptions. That's the last step for logical
+ * replication setup.
+ */
+static void
+setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
+{
+	for (int i = 0; i < num_dbs; i++)
+	{
+		PGconn	   *conn;
+
+		/* Connect to subscriber. */
+		conn = connect_database(dbinfo[i].subconninfo, true);
+
+		/*
+		 * Since the publication was created before the consistent LSN, it is
+		 * available on the subscriber when the physical replica is promoted.
+		 * Remove publications from the subscriber because it has no use.
+		 */
+		drop_publication(conn, &dbinfo[i]);
+
+		create_subscription(conn, &dbinfo[i]);
+
+		/* Set the replication progress to the correct LSN */
+		set_replication_progress(conn, &dbinfo[i], consistent_lsn);
+
+		/* Enable subscription */
+		enable_subscription(conn, &dbinfo[i]);
+
+		disconnect_database(conn, false);
+	}
+}
+
+/*
+ * Write the required recovery parameters.
+ */
+static void
+setup_recovery(struct LogicalRepInfo *dbinfo, char *datadir, char *lsn)
+{
+	PGconn	   *conn;
+	PQExpBuffer recoveryconfcontents;
+
+	/*
+	 * Despite of the recovery parameters will be written to the subscriber,
+	 * use a publisher connection. The primary_conninfo is generated using the
+	 * connection settings.
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo, true);
+
+	/*
+	 * Write recovery parameters.
+	 *
+	 * The subscriber is not running yet. In dry run mode, the recovery
+	 * parameters *won't* be written. An invalid LSN is used for printing
+	 * purposes. Additional recovery parameters are added here. It avoids
+	 * unexpected behavior such as end of recovery as soon as a consistent
+	 * state is reached (recovery_target) and failure due to multiple recovery
+	 * targets (name, time, xid, LSN).
+	 */
+	recoveryconfcontents = GenerateRecoveryConfig(conn, NULL);
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
+	appendPQExpBuffer(recoveryconfcontents,
+					  "recovery_target_timeline = 'latest'\n");
+	appendPQExpBuffer(recoveryconfcontents,
+					  "recovery_target_inclusive = true\n");
+	appendPQExpBuffer(recoveryconfcontents,
+					  "recovery_target_action = promote\n");
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
+
+	if (dry_run)
+	{
+		appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
+		appendPQExpBuffer(recoveryconfcontents,
+						  "recovery_target_lsn = '%X/%X'\n",
+						  LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
+	}
+	else
+	{
+		appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
+						  lsn);
+		WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
+	}
+	disconnect_database(conn, false);
+
+	pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
+}
+
+/*
+ * Drop physical replication slot on primary if the standby was using it. After
+ * the transformation, it has no use.
+ *
+ * XXX we might not fail here. Instead, we provide a warning so the user
+ * eventually drops this replication slot later.
+ */
+static void
+drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, char *slotname)
+{
+	PGconn	   *conn;
+
+	/* Replication slot does not exist, do nothing */
+	if (!primary_slot_name)
+		return;
+
+	conn = connect_database(dbinfo[0].pubconninfo, false);
+	if (conn != NULL)
+	{
+		drop_replication_slot(conn, &dbinfo[0], slotname);
+		disconnect_database(conn, false);
+	}
+	else
+	{
+		pg_log_warning("could not drop replication slot \"%s\" on primary",
+					   slotname);
+		pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+	}
+}
+
+/*
+ * Create a logical replication slot and returns a LSN.
+ *
+ * CreateReplicationSlot() is not used because it does not provide the one-row
+ * result set that contains the LSN.
+ */
+static char *
+create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res = NULL;
+	char		slot_name[NAMEDATALEN];
+	char	   *lsn = NULL;
+
+	Assert(conn != NULL);
+
+	snprintf(slot_name, NAMEDATALEN, "%s", dbinfo->replslotname);
+
+	pg_log_info("creating the replication slot \"%s\" on database \"%s\"",
+				slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str,
+					  "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot('%s', '%s', %s, false, false)",
+					  slot_name, "pgoutput", "false");
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s",
+						 slot_name, dbinfo->dbname,
+						 PQresultErrorMessage(res));
+			return NULL;
+		}
+
+		lsn = pg_strdup(PQgetvalue(res, 0, 0));
+		PQclear(res);
+	}
+
+	/* For cleanup purposes */
+	dbinfo->made_replslot = true;
+
+	destroyPQExpBuffer(str);
+
+	return lsn;
+}
+
+static void
+drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
+					  const char *slot_name)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("dropping the replication slot \"%s\" on database \"%s\"",
+				slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot('%s')", slot_name);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s",
+						 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
+			dbinfo->made_replslot = false;	/* don't try again. */
+		}
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Reports a suitable message if pg_ctl fails.
+ */
+static void
+pg_ctl_status(const char *pg_ctl_cmd, int rc)
+{
+	if (rc != 0)
+	{
+		if (WIFEXITED(rc))
+		{
+			pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
+		}
+		else if (WIFSIGNALED(rc))
+		{
+#if defined(WIN32)
+			pg_log_error("pg_ctl was terminated by exception 0x%X",
+						 WTERMSIG(rc));
+			pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
+#else
+			pg_log_error("pg_ctl was terminated by signal %d: %s",
+						 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
+#endif
+		}
+		else
+		{
+			pg_log_error("pg_ctl exited with unrecognized status %d", rc);
+		}
+
+		pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
+		exit(1);
+	}
+}
+
+static void
+start_standby_server(struct CreateSubscriberOptions *opt, bool restricted_access)
+{
+	PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
+	int			rc;
+
+	appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s",
+					  pg_ctl_path, subscriber_dir);
+	if (restricted_access)
+	{
+		appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
+#if !defined(WIN32)
+
+		/*
+		 * An empty listen_addresses list means the server does not listen on
+		 * any IP interfaces; only Unix-domain sockets can be used to connect
+		 * to the server. Prevent external connections to minimize the chance
+		 * of failure.
+		 */
+		appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
+		if (opt->socket_dir)
+			appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
+							  opt->socket_dir);
+		appendPQExpBufferChar(pg_ctl_cmd, '"');
+#endif
+	}
+	if (opt->config_file != NULL)
+		appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
+						  opt->config_file);
+	pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
+	rc = system(pg_ctl_cmd->data);
+	pg_ctl_status(pg_ctl_cmd->data, rc);
+	standby_running = true;
+	destroyPQExpBuffer(pg_ctl_cmd);
+	pg_log_info("server was started");
+}
+
+static void
+stop_standby_server(const char *datadir)
+{
+	char	   *pg_ctl_cmd;
+	int			rc;
+
+	pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
+						  datadir);
+	pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
+	rc = system(pg_ctl_cmd);
+	pg_ctl_status(pg_ctl_cmd, rc);
+	standby_running = false;
+	pg_log_info("server was stopped");
+}
+
+/*
+ * Returns after the server finishes the recovery process.
+ *
+ * If recovery_timeout option is set, terminate abnormally without finishing
+ * the recovery process. By default, it waits forever.
+ */
+static void
+wait_for_end_recovery(const char *conninfo, struct CreateSubscriberOptions *opt)
+{
+	PGconn	   *conn;
+	int			status = POSTMASTER_STILL_STARTING;
+	int			timer = 0;
+	int			count = 0;		/* number of consecutive connection attempts */
+
+#define NUM_CONN_ATTEMPTS	10
+
+	pg_log_info("waiting for the target server to reach the consistent state");
+
+	conn = connect_database(conninfo, true);
+
+	for (;;)
+	{
+		PGresult   *res;
+		bool		in_recovery = server_is_in_recovery(conn);
+
+		/*
+		 * Does the recovery process finish? In dry run mode, there is no
+		 * recovery mode. Bail out as the recovery process has ended.
+		 */
+		if (!in_recovery || dry_run)
+		{
+			status = POSTMASTER_READY;
+			recovery_ended = true;
+			break;
+		}
+
+		/*
+		 * If it is still in recovery, make sure the target server is
+		 * connected to the primary so it can receive the required WAL to
+		 * finish the recovery process. If it is disconnected try
+		 * NUM_CONN_ATTEMPTS in a row and bail out if not succeed.
+		 */
+		res = PQexec(conn,
+					 "SELECT 1 FROM pg_catalog.pg_stat_wal_receiver");
+		if (PQntuples(res) == 0)
+		{
+			if (++count > NUM_CONN_ATTEMPTS)
+			{
+				stop_standby_server(subscriber_dir);
+				pg_log_error("standby server disconnected from the primary");
+				break;
+			}
+		}
+		else
+			count = 0;			/* reset counter if it connects again */
+
+		PQclear(res);
+
+		/* Bail out after recovery_timeout seconds if this option is set */
+		if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
+		{
+			stop_standby_server(subscriber_dir);
+			pg_log_error("recovery timed out");
+			disconnect_database(conn, true);
+		}
+
+		/* Keep waiting */
+		pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
+
+		timer += WAIT_INTERVAL;
+	}
+
+	disconnect_database(conn, false);
+
+	if (status == POSTMASTER_STILL_STARTING)
+		pg_fatal("server did not end recovery");
+
+	pg_log_info("target server reached the consistent state");
+	pg_log_info_hint("If pg_createsubscriber fails after this point, "
+					 "you must recreate the physical replica before continuing.");
+}
+
+/*
+ * Create a publication that includes all tables in the database.
+ */
+static void
+create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	/* Check if the publication already exists */
+	appendPQExpBuffer(str,
+					  "SELECT 1 FROM pg_catalog.pg_publication "
+					  "WHERE pubname = '%s'",
+					  dbinfo->pubname);
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publication information: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+	if (PQntuples(res) == 1)
+	{
+		/*
+		 * Unfortunately, if it reaches this code path, it will always fail
+		 * (unless you decide to change the existing publication name). That's
+		 * bad but it is very unlikely that the user will choose a name with
+		 * pg_createsubscriber_ prefix followed by the exact database oid and
+		 * a random number.
+		 */
+		pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
+		pg_log_error_hint("Consider renaming this publication before continuing.");
+		disconnect_database(conn, true);
+	}
+
+	PQclear(res);
+	resetPQExpBuffer(str);
+
+	pg_log_info("creating publication \"%s\" on database \"%s\"",
+				dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
+					  dbinfo->pubname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not create publication \"%s\" on database \"%s\": %s",
+						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+			disconnect_database(conn, true);
+		}
+		PQclear(res);
+	}
+
+	/* For cleanup purposes */
+	dbinfo->made_publication = true;
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove publication if it couldn't finish all steps.
+ */
+static void
+drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("dropping publication \"%s\" on database \"%s\"",
+				dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP PUBLICATION %s", dbinfo->pubname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not drop publication \"%s\" on database \"%s\": %s",
+						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+			dbinfo->made_publication = false;	/* don't try again. */
+
+			/*
+			 * Don't disconnect and exit here. This routine is used by primary
+			 * (cleanup publication / replication slot due to an error) and
+			 * subscriber (remove the replicated publications). In both cases,
+			 * it can continue and provide instructions for the user to remove
+			 * it later if cleanup fails.
+			 */
+		}
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Create a subscription with some predefined options.
+ *
+ * A replication slot was already created in a previous step. Let's use it.  It
+ * is not required to copy data. The subscription will be created but it will
+ * not be enabled now. That's because the replication progress must be set and
+ * the replication origin name (one of the function arguments) contains the
+ * subscription OID in its name. Once the subscription is created,
+ * set_replication_progress() can obtain the chosen origin name and set up its
+ * initial location.
+ */
+static void
+create_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("creating subscription \"%s\" on database \"%s\"",
+				dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str,
+					  "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s "
+					  "WITH (create_slot = false, enabled = false, "
+					  "slot_name = '%s', copy_data = false)",
+					  dbinfo->subname, dbinfo->pubconninfo, dbinfo->pubname,
+					  dbinfo->replslotname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not create subscription \"%s\" on database \"%s\": %s",
+						 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
+			disconnect_database(conn, true);
+		}
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Sets the replication progress to the consistent LSN.
+ *
+ * The subscriber caught up to the consistent LSN provided by the last
+ * replication slot that was created. The goal is to set up the initial
+ * location for the logical replication that is the exact LSN that the
+ * subscriber was promoted. Once the subscription is enabled it will start
+ * streaming from that location onwards.  In dry run mode, the subscription OID
+ * and LSN are set to invalid values for printing purposes.
+ */
+static void
+set_replication_progress(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *lsn)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+	Oid			suboid;
+	char		originname[NAMEDATALEN];
+	char		lsnstr[17 + 1]; /* MAXPG_LSNLEN = 17 */
+
+	Assert(conn != NULL);
+
+	appendPQExpBuffer(str,
+					  "SELECT oid FROM pg_catalog.pg_subscription "
+					  "WHERE subname = '%s'",
+					  dbinfo->subname);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain subscription OID: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+	if (PQntuples(res) != 1 && !dry_run)
+	{
+		pg_log_error("could not obtain subscription OID: got %d rows, expected %d rows",
+					 PQntuples(res), 1);
+		disconnect_database(conn, true);
+	}
+
+	if (dry_run)
+	{
+		suboid = InvalidOid;
+		snprintf(lsnstr, sizeof(lsnstr), "%X/%X",
+				 LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
+	}
+	else
+	{
+		suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+		snprintf(lsnstr, sizeof(lsnstr), "%s", lsn);
+	}
+
+	PQclear(res);
+
+	/*
+	 * The origin name is defined as pg_%u. %u is the subscription OID. See
+	 * ApplyWorkerMain().
+	 */
+	snprintf(originname, sizeof(originname), "pg_%u", suboid);
+
+	pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
+				originname, lsnstr, dbinfo->dbname);
+
+	resetPQExpBuffer(str);
+	appendPQExpBuffer(str,
+					  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
+					  originname, lsnstr);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not set replication progress for the subscription \"%s\": %s",
+						 dbinfo->subname, PQresultErrorMessage(res));
+			disconnect_database(conn, true);
+		}
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Enables the subscription.
+ *
+ * The subscription was created in a previous step but it was disabled. After
+ * adjusting the initial logical replication location, enable the subscription.
+ */
+static void
+enable_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("enabling subscription \"%s\" on database \"%s\"",
+				dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", dbinfo->subname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not enable subscription \"%s\": %s",
+						 dbinfo->subname, PQresultErrorMessage(res));
+			disconnect_database(conn, true);
+		}
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+int
+main(int argc, char **argv)
+{
+	static struct option long_options[] =
+	{
+		{"config-file", required_argument, NULL, 1},
+		{"publication", required_argument, NULL, 2},
+		{"replication-slot", required_argument, NULL, 3},
+		{"subscription", required_argument, NULL, 4},
+		{"database", required_argument, NULL, 'd'},
+		{"pgdata", required_argument, NULL, 'D'},
+		{"dry-run", no_argument, NULL, 'n'},
+		{"subscriber-port", required_argument, NULL, 'p'},
+		{"publisher-server", required_argument, NULL, 'P'},
+		{"socket-directory", required_argument, NULL, 's'},
+		{"recovery-timeout", required_argument, NULL, 't'},
+		{"subscriber-username", required_argument, NULL, 'U'},
+		{"verbose", no_argument, NULL, 'v'},
+		{"version", no_argument, NULL, 'V'},
+		{"help", no_argument, NULL, '?'},
+		{NULL, 0, NULL, 0}
+	};
+
+	struct CreateSubscriberOptions opt = {0};
+
+	int			c;
+	int			option_index;
+
+	char	   *pub_base_conninfo;
+	char	   *sub_base_conninfo;
+	char	   *dbname_conninfo = NULL;
+
+	uint64		pub_sysid;
+	uint64		sub_sysid;
+	struct stat statbuf;
+
+	char	   *consistent_lsn;
+
+	char		pidfile[MAXPGPATH];
+
+	pg_logging_init(argv[0]);
+	pg_logging_set_level(PG_LOG_WARNING);
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber"));
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		else if (strcmp(argv[1], "-V") == 0
+				 || strcmp(argv[1], "--version") == 0)
+		{
+			puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	/* Default settings */
+	subscriber_dir = NULL;
+	opt.config_file = NULL;
+	opt.pub_conninfo_str = NULL;
+	opt.socket_dir = NULL;
+	opt.sub_port = palloc(16);
+	strcpy(opt.sub_port, DEFAULT_SUB_PORT);
+	opt.sub_username = NULL;
+	opt.database_names = (SimpleStringList)
+	{
+		NULL, NULL
+	};
+	opt.recovery_timeout = 0;
+
+	/*
+	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
+	 * it either.
+	 */
+#ifndef WIN32
+	if (geteuid() == 0)
+	{
+		pg_log_error("cannot be executed by \"root\"");
+		pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
+						  progname);
+		exit(1);
+	}
+#endif
+
+	get_restricted_token();
+
+	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+							long_options, &option_index)) != -1)
+	{
+		switch (c)
+		{
+			case 'd':
+				if (!simple_string_list_member(&opt.database_names, optarg))
+				{
+					simple_string_list_append(&opt.database_names, optarg);
+					num_dbs++;
+				}
+				else
+				{
+					pg_log_error("duplicate database \"%s\"", optarg);
+					exit(1);
+				}
+				break;
+			case 'D':
+				subscriber_dir = pg_strdup(optarg);
+				canonicalize_path(subscriber_dir);
+				break;
+			case 'n':
+				dry_run = true;
+				break;
+			case 'p':
+				pg_free(opt.sub_port);
+				opt.sub_port = pg_strdup(optarg);
+				break;
+			case 'P':
+				opt.pub_conninfo_str = pg_strdup(optarg);
+				break;
+			case 's':
+				opt.socket_dir = pg_strdup(optarg);
+				canonicalize_path(opt.socket_dir);
+				break;
+			case 't':
+				opt.recovery_timeout = atoi(optarg);
+				break;
+			case 'U':
+				opt.sub_username = pg_strdup(optarg);
+				break;
+			case 'v':
+				pg_logging_increase_verbosity();
+				break;
+			case 1:
+				opt.config_file = pg_strdup(optarg);
+				break;
+			case 2:
+				if (!simple_string_list_member(&opt.pub_names, optarg))
+				{
+					simple_string_list_append(&opt.pub_names, optarg);
+					num_pubs++;
+				}
+				else
+				{
+					pg_log_error("duplicate publication \"%s\"", optarg);
+					exit(1);
+				}
+				break;
+			case 3:
+				if (!simple_string_list_member(&opt.replslot_names, optarg))
+				{
+					simple_string_list_append(&opt.replslot_names, optarg);
+					num_replslots++;
+				}
+				else
+				{
+					pg_log_error("duplicate replication slot \"%s\"", optarg);
+					exit(1);
+				}
+				break;
+			case 4:
+				if (!simple_string_list_member(&opt.sub_names, optarg))
+				{
+					simple_string_list_append(&opt.sub_names, optarg);
+					num_subs++;
+				}
+				else
+				{
+					pg_log_error("duplicate subscription \"%s\"", optarg);
+					exit(1);
+				}
+				break;
+			default:
+				/* getopt_long already emitted a complaint */
+				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+				exit(1);
+		}
+	}
+
+	/* Any non-option arguments? */
+	if (optind < argc)
+	{
+		pg_log_error("too many command-line arguments (first is \"%s\")",
+					 argv[optind]);
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+
+	/* Required arguments */
+	if (subscriber_dir == NULL)
+	{
+		pg_log_error("no subscriber data directory specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+
+	/* If socket directory is not provided, use the current directory */
+	if (opt.socket_dir == NULL)
+	{
+		char		cwd[MAXPGPATH];
+
+		if (!getcwd(cwd, MAXPGPATH))
+			pg_fatal("could not determine current directory");
+		opt.socket_dir = pg_strdup(cwd);
+		canonicalize_path(opt.socket_dir);
+	}
+
+	/*
+	 * Parse connection string. Build a base connection string that might be
+	 * reused by multiple databases.
+	 */
+	if (opt.pub_conninfo_str == NULL)
+	{
+		/*
+		 * TODO use primary_conninfo (if available) from subscriber and
+		 * extract publisher connection string. Assume that there are
+		 * identical entries for physical and logical replication. If there is
+		 * not, we would fail anyway.
+		 */
+		pg_log_error("no publisher connection string specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+	pg_log_info("validating connection string on publisher");
+	pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
+										  &dbname_conninfo);
+	if (pub_base_conninfo == NULL)
+		exit(1);
+
+	pg_log_info("validating connection string on subscriber");
+	sub_base_conninfo = get_sub_conninfo(&opt);
+
+	if (opt.database_names.head == NULL)
+	{
+		pg_log_info("no database was specified");
+
+		/*
+		 * If --database option is not provided, try to obtain the dbname from
+		 * the publisher conninfo. If dbname parameter is not available, error
+		 * out.
+		 */
+		if (dbname_conninfo)
+		{
+			simple_string_list_append(&opt.database_names, dbname_conninfo);
+			num_dbs++;
+
+			pg_log_info("database \"%s\" was extracted from the publisher connection string",
+						dbname_conninfo);
+		}
+		else
+		{
+			pg_log_error("no database name specified");
+			pg_log_error_hint("Try \"%s --help\" for more information.",
+							  progname);
+			exit(1);
+		}
+	}
+
+	/* Number of object names must match number of databases */
+	if (num_pubs > 0 && num_pubs != num_dbs)
+	{
+		pg_log_error("wrong number of publication names");
+		pg_log_error_hint("Number of publication names (%d) must match number of database names (%d).",
+						  num_pubs, num_dbs);
+		exit(1);
+	}
+	if (num_subs > 0 && num_subs != num_dbs)
+	{
+		pg_log_error("wrong number of subscription names");
+		pg_log_error_hint("Number of subscription names (%d) must match number of database names (%d).",
+						  num_subs, num_dbs);
+		exit(1);
+	}
+	if (num_replslots > 0 && num_replslots != num_dbs)
+	{
+		pg_log_error("wrong number of replication slot names");
+		pg_log_error_hint("Number of replication slot names (%d) must match number of database names (%d).",
+						  num_replslots, num_dbs);
+		exit(1);
+	}
+
+	/* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
+	pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
+	pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
+
+	/* Rudimentary check for a data directory */
+	check_data_directory(subscriber_dir);
+
+	/*
+	 * Store database information for publisher and subscriber. It should be
+	 * called before atexit() because its return is used in the
+	 * cleanup_objects_atexit().
+	 */
+	dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
+
+	/* Register a function to clean up objects in case of failure */
+	atexit(cleanup_objects_atexit);
+
+	/*
+	 * Check if the subscriber data directory has the same system identifier
+	 * than the publisher data directory.
+	 */
+	pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
+	sub_sysid = get_standby_sysid(subscriber_dir);
+	if (pub_sysid != sub_sysid)
+		pg_fatal("subscriber data directory is not a copy of the source database cluster");
+
+	/* Subscriber PID file */
+	snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
+
+	/*
+	 * If the standby server is running, stop it. Some parameters (that can
+	 * only be set at server start) are informed by command-line options.
+	 */
+	if (stat(pidfile, &statbuf) == 0)
+	{
+
+		pg_log_info("standby is up and running");
+		pg_log_info("stopping the server to start the transformation steps");
+		stop_standby_server(subscriber_dir);
+	}
+
+	/*
+	 * Start a short-lived standby server with temporary parameters (provided
+	 * by command-line options). The goal is to avoid connections during the
+	 * transformation steps.
+	 */
+	pg_log_info("starting the standby with command-line options");
+	start_standby_server(&opt, true);
+
+	/* Check if the standby server is ready for logical replication */
+	check_subscriber(dbinfo);
+
+	/*
+	 * Check if the primary server is ready for logical replication. This
+	 * routine checks if a replication slot is in use on primary so it relies
+	 * on check_subscriber() to obtain the primary_slot_name. That's why it is
+	 * called after it.
+	 */
+	check_publisher(dbinfo);
+
+	/*
+	 * Create the required objects for each database on publisher. This step
+	 * is here mainly because if we stop the standby we cannot verify if the
+	 * primary slot is in use. We could use an extra connection for it but it
+	 * doesn't seem worth.
+	 */
+	consistent_lsn = setup_publisher(dbinfo);
+
+	/* Write the required recovery parameters */
+	setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
+
+	/*
+	 * Restart subscriber so the recovery parameters will take effect. Wait
+	 * until accepting connections.
+	 */
+	pg_log_info("stopping and starting the subscriber");
+	stop_standby_server(subscriber_dir);
+	start_standby_server(&opt, true);
+
+	/* Waiting the subscriber to be promoted */
+	wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
+
+	/*
+	 * Create the subscription for each database on subscriber. It does not
+	 * enable it immediately because it needs to adjust the replication start
+	 * point to the LSN reported by setup_publisher().  It also cleans up
+	 * publications created by this tool and replication to the standby.
+	 */
+	setup_subscriber(dbinfo, consistent_lsn);
+
+	/* Remove primary_slot_name if it exists on primary */
+	drop_primary_replication_slot(dbinfo, primary_slot_name);
+
+	/* Stop the subscriber */
+	pg_log_info("stopping the subscriber");
+	stop_standby_server(subscriber_dir);
+
+	/* Change system identifier from subscriber */
+	modify_subscriber_sysid(&opt);
+
+	/*
+	 * In dry run mode, the server is restarted with the provided command-line
+	 * options so validation can be applied in the target server. In order to
+	 * preserve the initial state of the server (running), start it without
+	 * the command-line options.
+	 */
+	if (dry_run)
+		start_standby_server(&opt, false);
+
+	success = true;
+
+	pg_log_info("Done!");
+
+	return 0;
+}
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
new file mode 100644
index 0000000000..ecf503a7d0
--- /dev/null
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -0,0 +1,326 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+#
+# Test using a standby server as the subscriber.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+program_help_ok('pg_createsubscriber');
+program_version_ok('pg_createsubscriber');
+program_options_handling_ok('pg_createsubscriber');
+
+my $datadir = PostgreSQL::Test::Utils::tempdir;
+
+#
+# Test mandatory options
+command_fails(['pg_createsubscriber'],
+	'no subscriber data directory specified');
+command_fails(
+	[ 'pg_createsubscriber', '--pgdata', $datadir ],
+	'no publisher connection string specified');
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--pgdata', $datadir,
+		'--publisher-server', 'port=5432'
+	],
+	'no database name specified');
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--pgdata', $datadir,
+		'--publisher-server', 'port=5432',
+		'--database', 'pg1',
+		'--database', 'pg1'
+	],
+	'duplicate database name');
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--pgdata', $datadir,
+		'--publisher-server', 'port=5432',
+		'--publication', 'foo1',
+		'--publication', 'foo1',
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'duplicate publication name');
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--pgdata', $datadir,
+		'--publisher-server', 'port=5432',
+		'--publication', 'foo1',
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'wrong number of publication names');
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--pgdata', $datadir,
+		'--publisher-server', 'port=5432',
+		'--publication', 'foo1',
+		'--publication', 'foo2',
+		'--subscription', 'bar1',
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'wrong number of subscription names');
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--pgdata', $datadir,
+		'--publisher-server', 'port=5432',
+		'--publication', 'foo1',
+		'--publication', 'foo2',
+		'--subscription', 'bar1',
+		'--subscription', 'bar2',
+		'--replication-slot', 'baz1',
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'wrong number of replication slot names');
+
+# Set up node P as primary
+my $node_p = PostgreSQL::Test::Cluster->new('node_p');
+$node_p->init(allows_streaming => 'logical');
+$node_p->start;
+
+# Set up node F as about-to-fail node
+# Force it to initialize a new cluster instead of copying a
+# previously initdb'd cluster. New cluster has a different system identifier so
+# we can test if the target cluster is a copy of the source cluster.
+my $node_f = PostgreSQL::Test::Cluster->new('node_f');
+$node_f->init(force_initdb => 1, allows_streaming => 'logical');
+$node_f->start;
+
+# On node P
+# - create databases
+# - create test tables
+# - insert a row
+# - create a physical replication slot
+$node_p->safe_psql(
+	'postgres', q(
+	CREATE DATABASE pg1;
+	CREATE DATABASE pg2;
+));
+$node_p->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_p->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+my $slotname = 'physical_slot';
+$node_p->safe_psql('pg2',
+	"SELECT pg_create_physical_replication_slot('$slotname')");
+
+# Set up node S as standby linking to node P
+$node_p->backup('backup_1');
+my $node_s = PostgreSQL::Test::Cluster->new('node_s');
+$node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1);
+$node_s->append_conf(
+	'postgresql.conf', qq[
+primary_slot_name = '$slotname'
+]);
+$node_s->set_standby_mode();
+$node_s->start;
+
+# Run pg_createsubscriber on about-to-fail node F
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--pgdata', $node_f->data_dir,
+		'--publisher-server', $node_p->connstr('pg1'),
+		'--socket-directory', $node_f->host,
+		'--subscriber-port', $node_f->port,
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'subscriber data directory is not a copy of the source database cluster');
+
+# Set up node C as standby linking to node S
+$node_s->backup('backup_2');
+my $node_c = PostgreSQL::Test::Cluster->new('node_c');
+$node_c->init_from_backup($node_s, 'backup_2', has_streaming => 1);
+$node_c->adjust_conf('postgresql.conf', 'primary_slot_name', undef);
+$node_c->set_standby_mode();
+$node_c->start;
+
+# Run pg_createsubscriber on node C (P -> S -> C)
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--dry-run', '--pgdata',
+		$node_c->data_dir, '--publisher-server',
+		$node_s->connstr('pg1'),
+		'--socket-directory', $node_c->host,
+		'--subscriber-port', $node_c->port,
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'primary server is in recovery');
+
+# Insert another row on node P and wait node S to catch up
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('second row')");
+$node_p->wait_for_replay_catchup($node_s);
+
+# Check some unmet conditions on node P
+$node_p->append_conf('postgresql.conf', q{
+wal_level = replica
+max_replication_slots = 1
+max_wal_senders = 1
+max_worker_processes = 2
+});
+$node_p->restart;
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--dry-run', '--pgdata',
+		$node_s->data_dir, '--publisher-server',
+		$node_p->connstr('pg1'),
+		'--socket-directory', $node_s->host,
+		'--subscriber-port', $node_s->port,
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'primary contains unmet conditions on node P');
+# Restore default settings here but only apply it after testing standby. Some
+# standby settings should not be a lower setting than on the primary.
+$node_p->append_conf('postgresql.conf', q{
+wal_level = logical
+max_replication_slots = 10
+max_wal_senders = 10
+max_worker_processes = 8
+});
+
+# Check some unmet conditions on node S
+$node_s->append_conf('postgresql.conf', q{
+max_replication_slots = 1
+max_logical_replication_workers = 1
+max_worker_processes = 2
+});
+$node_s->restart;
+command_fails(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--dry-run', '--pgdata',
+		$node_s->data_dir, '--publisher-server',
+		$node_p->connstr('pg1'),
+		'--socket-directory', $node_s->host,
+		'--subscriber-port', $node_s->port,
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'standby contains unmet conditions on node S');
+$node_s->append_conf('postgresql.conf', q{
+max_replication_slots = 10
+max_logical_replication_workers = 4
+max_worker_processes = 8
+});
+# Restore default settings on both servers
+$node_s->restart;
+$node_p->restart;
+
+# dry run mode on node S
+command_ok(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--dry-run', '--pgdata',
+		$node_s->data_dir, '--publisher-server',
+		$node_p->connstr('pg1'),
+		'--socket-directory', $node_s->host,
+		'--subscriber-port', $node_s->port,
+		'--publication', 'pub1',
+		'--publication', 'pub2',
+		'--subscription', 'sub1',
+		'--subscription', 'sub2',
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'run pg_createsubscriber --dry-run on node S');
+
+# Check if node S is still a standby
+is($node_s->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
+	't', 'standby is in recovery');
+
+# pg_createsubscriber can run without --databases option
+command_ok(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--dry-run', '--pgdata',
+		$node_s->data_dir, '--publisher-server',
+		$node_p->connstr('pg1'),
+		'--socket-directory', $node_s->host,
+		'--subscriber-port', $node_s->port,
+		'--replication-slot', 'replslot1'
+	],
+	'run pg_createsubscriber without --databases');
+
+# Run pg_createsubscriber on node S
+command_ok(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--verbose', '--pgdata',
+		$node_s->data_dir, '--publisher-server',
+		$node_p->connstr('pg1'),
+		'--socket-directory', $node_s->host,
+		'--subscriber-port', $node_s->port,
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'run pg_createsubscriber on node S');
+
+# Confirm the physical replication slot has been removed
+my $result = $node_p->safe_psql('pg1',
+	"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$slotname'"
+);
+is($result, qq(0),
+	'the physical replication slot used as primary_slot_name has been removed'
+);
+
+# Insert rows on P
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('third row')");
+$node_p->safe_psql('pg2', "INSERT INTO tbl2 VALUES('row 1')");
+
+# PID sets to undefined because subscriber was stopped behind the scenes.
+# Start subscriber
+$node_s->{_pid} = undef;
+$node_s->start;
+
+# Get subscription names
+$result = $node_s->safe_psql(
+	'postgres', qq(
+	SELECT subname FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+));
+my @subnames = split("\n", $result);
+
+# Wait subscriber to catch up
+$node_s->wait_for_subscription_sync($node_p, $subnames[0]);
+$node_s->wait_for_subscription_sync($node_p, $subnames[1]);
+
+# Check result on database pg1
+$result = $node_s->safe_psql('pg1', 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row),
+	'logical replication works on database pg1');
+
+# Check result on database pg2
+$result = $node_s->safe_psql('pg2', 'SELECT * FROM tbl2');
+is($result, qq(row 1), 'logical replication works on database pg2');
+
+# Different system identifier?
+my $sysid_p = $node_p->safe_psql('postgres',
+	'SELECT system_identifier FROM pg_control_system()');
+my $sysid_s = $node_s->safe_psql('postgres',
+	'SELECT system_identifier FROM pg_control_system()');
+ok($sysid_p != $sysid_s, 'system identifier was changed');
+
+# clean up
+$node_p->teardown_node;
+$node_s->teardown_node;
+$node_f->teardown_node;
+
+done_testing();
-- 
2.34.1

From 86495414cbe59e66203ccf311de5af7541aa1be4 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler@eulerto.com>
Date: Sat, 16 Mar 2024 11:58:11 -0300
Subject: [PATCH v31 2/3] Stop the target server earlier

Since the recovery process requires that it reaches a consistent state
before considering the recovery stop point, stop the server before
creating the replication slots since the last replication slot is its
recovery stop point.
---
 src/bin/pg_basebackup/pg_createsubscriber.c | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 6cc1c34121..34ec7c8505 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -2074,6 +2074,16 @@ main(int argc, char **argv)
 	 */
 	check_publisher(dbinfo);
 
+	/*
+	 * Stop the target server. The recovery process requires that the server
+	 * reaches a consistent state before targeting the recovery stop point.
+	 * Make sure a consistent state is reached (stop the target server
+	 * guarantees it) *before* creating the replication slots in
+	 * setup_publisher().
+	 */
+	pg_log_info("stopping the subscriber");
+	stop_standby_server(subscriber_dir);
+
 	/*
 	 * Create the required objects for each database on publisher. This step
 	 * is here mainly because if we stop the standby we cannot verify if the
@@ -2086,11 +2096,10 @@ main(int argc, char **argv)
 	setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
 
 	/*
-	 * Restart subscriber so the recovery parameters will take effect. Wait
+	 * Start subscriber so the recovery parameters will take effect. Wait
 	 * until accepting connections.
 	 */
-	pg_log_info("stopping and starting the subscriber");
-	stop_standby_server(subscriber_dir);
+	pg_log_info("starting the subscriber");
 	start_standby_server(&opt, true);
 
 	/* Waiting the subscriber to be promoted */
-- 
2.34.1

From 7d60893f7d94e064b33e66f01e0f9ed44230f38d Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Tue, 19 Mar 2024 16:35:27 +0530
Subject: [PATCH v31 3/3] Document a limitation of pg_createsubscriber

Document a limitation of pg_createsubscriber
---
 doc/src/sgml/ref/pg_createsubscriber.sgml | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index 642213b5a4..7e1c5e25da 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -369,6 +369,18 @@ PostgreSQL documentation
     If the target server has a standby, replication will break and a fresh
     standby should be created.
    </para>
+
+   <para>
+    If the target server is not direct standby of the source server and none of the
+    parent standby of target server has a replication slot, the replication between
+    target server and its standby may break and a logical replication between target
+    server and source server is set up.
+    For example: Node A, B and C are in cascade physical replication without
+    replication slot. And if pg_createsubscriber is run on Node C as target server and
+    Node A as source server, the execution is successful and the physical replication
+    between Node B and Node C breaks and a logical replication is setup between Node A
+    and Node C.
+   </para>
   </warning>
 
  </refsect1>
-- 
2.34.1

Reply via email to