On Tue, Jan 23, 2024, at 10:29 PM, Euler Taveira wrote:
> I'll post a new one soon.

I'm attaching another patch that fixes some of the issues pointed out by
Hayato, Shlok, and Junwang.

* publication doesn't exist. The analysis [1] was done by Hayato but I didn't
  use the proposed patch. Instead I refactored the code a bit [2] and call it
  from a new function (setup_publisher) that is called before the promotion.
* fix wrong path name in the initial comment [3]
* change terminology: logical replica -> physical replica [3]
* primary / standby is ready for logical replication? setup_publisher() and
  setup_subscriber() check if required GUCs are set accordingly. For primary,
  it checks wal_level = logical, max_replication_slots has remain replication
  slots for the proposed setup and also max_wal_senders available. For standby,
  it checks max_replication_slots for replication origin and also remain number
  of background workers to start the subscriber.
* retain option: I extracted this one from Hayato's patch [4]
* target server must be a standby. It seems we agree that this restriction
  simplifies the code a bit but can be relaxed in the future (if/when base
  backup support is added.)
* recovery timeout option: I decided to include it but I think the use case is
  too narrow. It helps in broken setups. However, it can be an issue in some
  scenarios like time-delayed replica, large replication lag, slow hardware,
  slow network. I didn't use the proposed patch [5]. Instead, I came up with a
  simple one that defaults to forever. The proposed one defaults to 60 seconds
  but I'm afraid that due to one of the scenarios I said in a previous
  sentence, we cancel a legitimate case. Maybe we should add a message during
  dry run saying that due to a replication lag, it will take longer to run.
* refactor primary_slot_name code. With the new setup_publisher and
  setup_subscriber functions, I splitted the function that detects the
  primary_slot_name use into 2 pieces just to avoid extra connections to have
  the job done.
* remove fallback_application_name as suggested by Hayato [5] because logical
  replication already includes one.

I'm still thinking about replacing --subscriber-conninfo with separate items
(username, port, password?, host = socket dir). Maybe it is an overengineering.
The user can always prepare the environment to avoid unwanted and/or external
connections.

I didn't change the name from pg_subscriber to pg_createsubscriber yet but if I
didn't hear objections about it, I'll do it in the next patch.


[1] 
https://www.postgresql.org/message-id/TY3PR01MB9889C5D55206DDD978627D07F5752%40TY3PR01MB9889.jpnprd01.prod.outlook.com
[2] 
https://www.postgresql.org/message-id/73ab86ca-3fd5-49b3-9c80-73d1525202f1%40app.fastmail.com
[3] 
https://www.postgresql.org/message-id/TY3PR01MB9889678E47B918F4D83A6FD8F57B2%40TY3PR01MB9889.jpnprd01.prod.outlook.com
[4] 
https://www.postgresql.org/message-id/TY3PR01MB9889678E47B918F4D83A6FD8F57B2%40TY3PR01MB9889.jpnprd01.prod.outlook.com
[5] 
https://www.postgresql.org/message-id/TY3PR01MB9889593399165B9A04106741F5662%40TY3PR01MB9889.jpnprd01.prod.outlook.com


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From d9ef01a806c3d8697faa444283f19c2deaa58850 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Mon, 5 Jun 2023 14:39:40 -0400
Subject: [PATCH v9] Creates a new logical replica from a standby server

A new tool called pg_subscriber can convert a physical replica into a
logical replica. It runs on the target server and should be able to
connect to the source server (publisher) and the target server
(subscriber).

The conversion requires a few steps. Check if the target data directory
has the same system identifier than the source data directory. Stop the
target server if it is running as a standby server. Create one
replication slot per specified database on the source server. One
additional replication slot is created at the end to get the consistent
LSN (This consistent LSN will be used as (a) a stopping point for the
recovery process and (b) a starting point for the subscriptions). Write
recovery parameters into the target data directory and start the target
server (Wait until the target server is promoted). Create one
publication (FOR ALL TABLES) per specified database on the source
server. Create one subscription per specified database on the target
server (Use replication slot and publication created in a previous step.
Don't enable the subscriptions yet). Sets the replication progress to
the consistent LSN that was got in a previous step. Enable the
subscription for each specified database on the target server. Remove
the additional replication slot that was used to get the consistent LSN.
Stop the target server. Change the system identifier from the target
server.

Depending on your workload and database size, creating a logical replica
couldn't be an option due to resource constraints (WAL backlog should be
available until all table data is synchronized). The initial data copy
and the replication progress tends to be faster on a physical replica.
The purpose of this tool is to speed up a logical replica setup.
---
 doc/src/sgml/ref/allfiles.sgml                |    1 +
 doc/src/sgml/ref/pg_subscriber.sgml           |  305 +++
 doc/src/sgml/reference.sgml                   |    1 +
 src/bin/pg_basebackup/.gitignore              |    1 +
 src/bin/pg_basebackup/Makefile                |    8 +-
 src/bin/pg_basebackup/meson.build             |   19 +
 src/bin/pg_basebackup/pg_subscriber.c         | 1805 +++++++++++++++++
 src/bin/pg_basebackup/t/040_pg_subscriber.pl  |   44 +
 .../t/041_pg_subscriber_standby.pl            |  139 ++
 9 files changed, 2322 insertions(+), 1 deletion(-)
 create mode 100644 doc/src/sgml/ref/pg_subscriber.sgml
 create mode 100644 src/bin/pg_basebackup/pg_subscriber.c
 create mode 100644 src/bin/pg_basebackup/t/040_pg_subscriber.pl
 create mode 100644 src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl

diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 4a42999b18..3862c976d7 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -214,6 +214,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pgResetwal         SYSTEM "pg_resetwal.sgml">
 <!ENTITY pgRestore          SYSTEM "pg_restore.sgml">
 <!ENTITY pgRewind           SYSTEM "pg_rewind.sgml">
+<!ENTITY pgSubscriber       SYSTEM "pg_subscriber.sgml">
 <!ENTITY pgVerifyBackup     SYSTEM "pg_verifybackup.sgml">
 <!ENTITY pgtestfsync        SYSTEM "pgtestfsync.sgml">
 <!ENTITY pgtesttiming       SYSTEM "pgtesttiming.sgml">
diff --git a/doc/src/sgml/ref/pg_subscriber.sgml b/doc/src/sgml/ref/pg_subscriber.sgml
new file mode 100644
index 0000000000..99d4fcee49
--- /dev/null
+++ b/doc/src/sgml/ref/pg_subscriber.sgml
@@ -0,0 +1,305 @@
+<!--
+doc/src/sgml/ref/pg_subscriber.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgsubscriber">
+ <indexterm zone="app-pgsubscriber">
+  <primary>pg_subscriber</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle><application>pg_subscriber</application></refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_subscriber</refname>
+  <refpurpose>convert a physical replica into a new logical replica</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_subscriber</command>
+   <arg rep="repeat"><replaceable>option</replaceable></arg>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+  <para>
+   <application>pg_subscriber</application> takes the publisher and subscriber
+   connection strings, a cluster directory from a physical replica and a list of
+   database names and it sets up a new logical replica using the physical
+   recovery process.
+  </para>
+
+  <para>
+   The <application>pg_subscriber</application> should be run at the target
+   server. The source server (known as publisher server) should accept logical
+   replication connections from the target server (known as subscriber server).
+   The target server should accept local logical replication connection.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    <application>pg_subscriber</application> accepts the following
+    command-line arguments:
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+      <term><option>--pgdata=<replaceable class="parameter">directory</replaceable></option></term>
+      <listitem>
+       <para>
+        The target directory that contains a cluster directory from a physical
+        replica.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-P  <replaceable class="parameter">conninfo</replaceable></option></term>
+      <term><option>--publisher-conninfo=<replaceable class="parameter">conninfo</replaceable></option></term>
+      <listitem>
+       <para>
+        The connection string to the publisher. For details see <xref linkend="libpq-connstring"/>.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-S <replaceable class="parameter">conninfo</replaceable></option></term>
+      <term><option>--subscriber-conninfo=<replaceable class="parameter">conninfo</replaceable></option></term>
+      <listitem>
+       <para>
+        The connection string to the subscriber. For details see <xref linkend="libpq-connstring"/>.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
+      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
+      <listitem>
+       <para>
+        The database name to create the subscription. Multiple databases can be
+        selected by writing multiple <option>-d</option> switches.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-n</option></term>
+      <term><option>--dry-run</option></term>
+      <listitem>
+       <para>
+        Do everything except actually modifying the target directory.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-r</option></term>
+      <term><option>--retain</option></term>
+      <listitem>
+       <para>
+        Retain log file even after successful completion.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-t <replaceable class="parameter">seconds</replaceable></option></term>
+       <term><option>--timeout=<replaceable class="parameter">seconds</replaceable></option></term>
+       <listitem>
+       <para>
+        The maximum number of seconds to wait for recovery to end. Setting to 0
+        disables. The default is 0.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-v</option></term>
+      <term><option>--verbose</option></term>
+      <listitem>
+       <para>
+        Enables verbose mode. This will cause
+        <application>pg_subscriber</application> to output progress messages
+        and detailed information about each step.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+
+   <para>
+    Other options are also available:
+
+    <variablelist>
+     <varlistentry>
+       <term><option>-V</option></term>
+       <term><option>--version</option></term>
+       <listitem>
+       <para>
+       Print the <application>pg_subscriber</application> version and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-?</option></term>
+       <term><option>--help</option></term>
+       <listitem>
+       <para>
+       Show help about <application>pg_subscriber</application> command
+       line arguments, and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+
+  <para>
+   The transformation proceeds in the following steps:
+  </para>
+
+  <procedure>
+   <step>
+    <para>
+     <application>pg_subscriber</application> checks if the given target data
+     directory has the same system identifier than the source data directory.
+     Since it uses the recovery process as one of the steps, it starts the
+     target server as a replica from the source server. If the system
+     identifier is not the same, <application>pg_subscriber</application> will
+     terminate with an error.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> checks if the target data
+     directory is used by a physical replica. Stop the physical replica if it is
+     running. One of the next steps is to add some recovery parameters that
+     requires a server start. This step avoids an error.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> creates one replication slot for
+     each specified database on the source server. The replication slot name
+     contains a <literal>pg_subscriber</literal> prefix. These replication
+     slots will be used by the subscriptions in a future step.  Another
+     replication slot is used to get a consistent start location. This
+     consistent LSN will be used as a stopping point in the <xref
+     linkend="guc-recovery-target-lsn"/> parameter and by the
+     subscriptions as a replication starting point. It guarantees that no
+     transaction will be lost.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> writes recovery parameters into
+     the target data directory and start the target server. It specifies a LSN
+     (consistent LSN that was obtained in the previous step) of write-ahead
+     log location up to which recovery will proceed. It also specifies
+     <literal>promote</literal> as the action that the server should take once
+     the recovery target is reached. This step finishes once the server ends
+     standby mode and is accepting read-write operations.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Next, <application>pg_subscriber</application> creates one publication
+     for each specified database on the source server. Each publication
+     replicates changes for all tables in the database. The publication name
+     contains a <literal>pg_subscriber</literal> prefix. These publication
+     will be used by a corresponding subscription in a next step.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> creates one subscription for
+     each specified database on the target server. Each subscription name
+     contains a <literal>pg_subscriber</literal> prefix. The replication slot
+     name is identical to the subscription name. It does not copy existing data
+     from the source server. It does not create a replication slot. Instead, it
+     uses the replication slot that was created in a previous step. The
+     subscription is created but it is not enabled yet. The reason is the
+     replication progress must be set to the consistent LSN but replication
+     origin name contains the subscription oid in its name. Hence, the
+     subscription will be enabled in a separate step.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> sets the replication progress to
+     the consistent LSN that was obtained in a previous step. When the target
+     server started the recovery process, it caught up to the consistent LSN.
+     This is the exact LSN to be used as a initial location for each
+     subscription.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     Finally, <application>pg_subscriber</application> enables the subscription
+     for each specified database on the target server. The subscription starts
+     streaming from the consistent LSN.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> removes the additional replication
+     slot that was used to get the consistent LSN on the source server.
+    </para>
+   </step>
+
+   <step>
+    <para>
+     <application>pg_subscriber</application> stops the target server to change
+     its system identifier.
+    </para>
+   </step>
+  </procedure>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   To create a logical replica for databases <literal>hr</literal> and
+   <literal>finance</literal> from a physical replica at <literal>foo</literal>:
+<screen>
+<prompt>$</prompt> <userinput>pg_subscriber -D /usr/local/pgsql/data -P "host=foo" -S "host=localhost" -d hr -d finance</userinput>
+</screen>
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>See Also</title>
+
+  <simplelist type="inline">
+   <member><xref linkend="app-pgbasebackup"/></member>
+  </simplelist>
+ </refsect1>
+
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index aa94f6adf6..266f4e515a 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -285,6 +285,7 @@
    &pgCtl;
    &pgResetwal;
    &pgRewind;
+   &pgSubscriber;
    &pgtestfsync;
    &pgtesttiming;
    &pgupgrade;
diff --git a/src/bin/pg_basebackup/.gitignore b/src/bin/pg_basebackup/.gitignore
index 26048bdbd8..0e5384a1d5 100644
--- a/src/bin/pg_basebackup/.gitignore
+++ b/src/bin/pg_basebackup/.gitignore
@@ -1,5 +1,6 @@
 /pg_basebackup
 /pg_receivewal
 /pg_recvlogical
+/pg_subscriber
 
 /tmp_check/
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index abfb6440ec..f6281b7676 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -44,7 +44,7 @@ BBOBJS = \
 	bbstreamer_tar.o \
 	bbstreamer_zstd.o
 
-all: pg_basebackup pg_receivewal pg_recvlogical
+all: pg_basebackup pg_receivewal pg_recvlogical pg_subscriber
 
 pg_basebackup: $(BBOBJS) $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
 	$(CC) $(CFLAGS) $(BBOBJS) $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
@@ -55,10 +55,14 @@ pg_receivewal: pg_receivewal.o $(OBJS) | submake-libpq submake-libpgport submake
 pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
 	$(CC) $(CFLAGS) pg_recvlogical.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
+pg_subscriber: $(WIN32RES) pg_subscriber.o | submake-libpq submake-libpgport submake-libpgfeutils
+	$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
 install: all installdirs
 	$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
 	$(INSTALL_PROGRAM) pg_receivewal$(X) '$(DESTDIR)$(bindir)/pg_receivewal$(X)'
 	$(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
+	$(INSTALL_PROGRAM) pg_subscriber$(X) '$(DESTDIR)$(bindir)/pg_subscriber$(X)'
 
 installdirs:
 	$(MKDIR_P) '$(DESTDIR)$(bindir)'
@@ -67,10 +71,12 @@ uninstall:
 	rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
 	rm -f '$(DESTDIR)$(bindir)/pg_receivewal$(X)'
 	rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
+	rm -f '$(DESTDIR)$(bindir)/pg_subscriber$(X)'
 
 clean distclean:
 	rm -f pg_basebackup$(X) pg_receivewal$(X) pg_recvlogical$(X) \
 		$(BBOBJS) pg_receivewal.o pg_recvlogical.o \
+		pg_subscriber$(X) pg_subscriber.o \
 		$(OBJS)
 	rm -rf tmp_check
 
diff --git a/src/bin/pg_basebackup/meson.build b/src/bin/pg_basebackup/meson.build
index f7e60e6670..ccfd7bb7a5 100644
--- a/src/bin/pg_basebackup/meson.build
+++ b/src/bin/pg_basebackup/meson.build
@@ -75,6 +75,23 @@ pg_recvlogical = executable('pg_recvlogical',
 )
 bin_targets += pg_recvlogical
 
+pg_subscriber_sources = files(
+  'pg_subscriber.c'
+)
+
+if host_system == 'windows'
+  pg_subscriber_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+	'--NAME', 'pg_subscriber',
+	'--FILEDESC', 'pg_subscriber - create a new logical replica from a standby server',])
+endif
+
+pg_subscriber = executable('pg_subscriber',
+  pg_subscriber_sources,
+  dependencies: [frontend_code, libpq],
+  kwargs: default_bin_args,
+)
+bin_targets += pg_subscriber
+
 tests += {
   'name': 'pg_basebackup',
   'sd': meson.current_source_dir(),
@@ -89,6 +106,8 @@ tests += {
       't/011_in_place_tablespace.pl',
       't/020_pg_receivewal.pl',
       't/030_pg_recvlogical.pl',
+      't/040_pg_subscriber.pl',
+      't/041_pg_subscriber_standby.pl',
     ],
   },
 }
diff --git a/src/bin/pg_basebackup/pg_subscriber.c b/src/bin/pg_basebackup/pg_subscriber.c
new file mode 100644
index 0000000000..cb97dbda5e
--- /dev/null
+++ b/src/bin/pg_basebackup/pg_subscriber.c
@@ -0,0 +1,1805 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_subscriber.c
+ *	  Create a new logical replica from a standby server
+ *
+ * Copyright (C) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/bin/pg_basebackup/pg_subscriber.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+#include <time.h>
+
+#include "access/xlogdefs.h"
+#include "catalog/pg_control.h"
+#include "common/connect.h"
+#include "common/controldata_utils.h"
+#include "common/file_perm.h"
+#include "common/file_utils.h"
+#include "common/logging.h"
+#include "fe_utils/recovery_gen.h"
+#include "fe_utils/simple_list.h"
+#include "getopt_long.h"
+#include "utils/pidfile.h"
+
+#define	PGS_OUTPUT_DIR	"pg_subscriber_output.d"
+
+typedef struct LogicalRepInfo
+{
+	Oid			oid;			/* database OID */
+	char	   *dbname;			/* database name */
+	char	   *pubconninfo;	/* publication connection string for logical
+								 * replication */
+	char	   *subconninfo;	/* subscription connection string for logical
+								 * replication */
+	char	   *pubname;		/* publication name */
+	char	   *subname;		/* subscription name (also replication slot
+								 * name) */
+
+	bool		made_replslot;	/* replication slot was created */
+	bool		made_publication;	/* publication was created */
+	bool		made_subscription;	/* subscription was created */
+} LogicalRepInfo;
+
+static void cleanup_objects_atexit(void);
+static void usage();
+static char *get_base_conninfo(char *conninfo, char *dbname,
+							   const char *noderole);
+static bool get_exec_path(const char *path);
+static bool check_data_directory(const char *datadir);
+static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
+static LogicalRepInfo *store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo);
+static PGconn *connect_database(const char *conninfo);
+static void disconnect_database(PGconn *conn);
+static uint64 get_sysid_from_conn(const char *conninfo);
+static uint64 get_control_from_datadir(const char *datadir);
+static void modify_sysid(const char *pg_resetwal_path, const char *datadir);
+static bool setup_publisher(LogicalRepInfo *dbinfo);
+static char *create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo,
+											 char *slot_name);
+static void drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name);
+static void pg_ctl_status(const char *pg_ctl_cmd, int rc, int action);
+static void wait_for_end_recovery(const char *conninfo);
+static void create_publication(PGconn *conn, LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, LogicalRepInfo *dbinfo);
+static void create_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+static void drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+static void set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn);
+static void enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo);
+
+#define	USEC_PER_SEC	1000000
+#define	WAIT_INTERVAL	1		/* 1 second */
+
+/* Options */
+static const char *progname;
+
+static char *subscriber_dir = NULL;
+static char *pub_conninfo_str = NULL;
+static char *sub_conninfo_str = NULL;
+static SimpleStringList database_names = {NULL, NULL};
+static char *primary_slot_name = NULL;
+static bool dry_run = false;
+static bool retain = false;
+static int	recovery_timeout = 0;
+
+static bool success = false;
+
+static char *pg_ctl_path = NULL;
+static char *pg_resetwal_path = NULL;
+
+static LogicalRepInfo *dbinfo;
+static int	num_dbs = 0;
+
+static char temp_replslot[NAMEDATALEN] = {0};
+static bool made_transient_replslot = false;
+
+enum WaitPMResult
+{
+	POSTMASTER_READY,
+	POSTMASTER_STANDBY,
+	POSTMASTER_STILL_STARTING,
+	POSTMASTER_FAILED
+};
+
+
+/*
+ * Cleanup objects that were created by pg_subscriber if there is an error.
+ *
+ * Replication slots, publications and subscriptions are created. Depending on
+ * the step it failed, it should remove the already created objects if it is
+ * possible (sometimes it won't work due to a connection issue).
+ */
+static void
+cleanup_objects_atexit(void)
+{
+	PGconn	   *conn;
+	int			i;
+
+	if (success)
+		return;
+
+	for (i = 0; i < num_dbs; i++)
+	{
+		if (dbinfo[i].made_subscription)
+		{
+			conn = connect_database(dbinfo[i].subconninfo);
+			if (conn != NULL)
+			{
+				drop_subscription(conn, &dbinfo[i]);
+				disconnect_database(conn);
+			}
+		}
+
+		if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
+		{
+			conn = connect_database(dbinfo[i].pubconninfo);
+			if (conn != NULL)
+			{
+				if (dbinfo[i].made_publication)
+					drop_publication(conn, &dbinfo[i]);
+				if (dbinfo[i].made_replslot)
+					drop_replication_slot(conn, &dbinfo[i], NULL);
+				disconnect_database(conn);
+			}
+		}
+	}
+
+	if (made_transient_replslot)
+	{
+		conn = connect_database(dbinfo[0].pubconninfo);
+		if (conn != NULL)
+		{
+			drop_replication_slot(conn, &dbinfo[0], temp_replslot);
+			disconnect_database(conn);
+		}
+	}
+}
+
+static void
+usage(void)
+{
+	printf(_("%s creates a new logical replica from a standby server.\n\n"),
+		   progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]...\n"), progname);
+	printf(_("\nOptions:\n"));
+	printf(_(" -D, --pgdata=DATADIR                location for the subscriber data directory\n"));
+	printf(_(" -P, --publisher-conninfo=CONNINFO   publisher connection string\n"));
+	printf(_(" -S, --subscriber-conninfo=CONNINFO  subscriber connection string\n"));
+	printf(_(" -d, --database=DBNAME               database to create a subscription\n"));
+	printf(_(" -n, --dry-run                       stop before modifying anything\n"));
+	printf(_(" -t, --recovery-timeout=SECS         seconds to wait for recovery to end\n"));
+	printf(_(" -r, --retain                        retain log file after success\n"));
+	printf(_(" -v, --verbose                       output verbose messages\n"));
+	printf(_(" -V, --version                       output version information, then exit\n"));
+	printf(_(" -?, --help                          show this help, then exit\n"));
+	printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
+	printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
+}
+
+/*
+ * Validate a connection string. Returns a base connection string that is a
+ * connection string without a database name.
+ * Since we might process multiple databases, each database name will be
+ * appended to this base connection string to provide a final connection string.
+ * If the second argument (dbname) is not null, returns dbname if the provided
+ * connection string contains it. If option --database is not provided, uses
+ * dbname as the only database to setup the logical replica.
+ * It is the caller's responsibility to free the returned connection string and
+ * dbname.
+ */
+static char *
+get_base_conninfo(char *conninfo, char *dbname, const char *noderole)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	PQconninfoOption *conn_opts = NULL;
+	PQconninfoOption *conn_opt;
+	char	   *errmsg = NULL;
+	char	   *ret;
+	int			i;
+
+	pg_log_info("validating connection string on %s", noderole);
+
+	conn_opts = PQconninfoParse(conninfo, &errmsg);
+	if (conn_opts == NULL)
+	{
+		pg_log_error("could not parse connection string: %s", errmsg);
+		return NULL;
+	}
+
+	i = 0;
+	for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
+	{
+		if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL)
+		{
+			if (dbname)
+				dbname = pg_strdup(conn_opt->val);
+			continue;
+		}
+
+		if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
+		{
+			if (i > 0)
+				appendPQExpBufferChar(buf, ' ');
+			appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val);
+			i++;
+		}
+	}
+
+	ret = pg_strdup(buf->data);
+
+	destroyPQExpBuffer(buf);
+	PQconninfoFree(conn_opts);
+
+	return ret;
+}
+
+/*
+ * Get the absolute path from other PostgreSQL binaries (pg_ctl and
+ * pg_resetwal) that is used by it.
+ */
+static bool
+get_exec_path(const char *path)
+{
+	int			rc;
+
+	pg_ctl_path = pg_malloc(MAXPGPATH);
+	rc = find_other_exec(path, "pg_ctl",
+						 "pg_ctl (PostgreSQL) " PG_VERSION "\n",
+						 pg_ctl_path);
+	if (rc < 0)
+	{
+		char		full_path[MAXPGPATH];
+
+		if (find_my_exec(path, full_path) < 0)
+			strlcpy(full_path, progname, sizeof(full_path));
+		if (rc == -1)
+			pg_log_error("The program \"%s\" is needed by %s but was not found in the\n"
+						 "same directory as \"%s\".\n"
+						 "Check your installation.",
+						 "pg_ctl", progname, full_path);
+		else
+			pg_log_error("The program \"%s\" was found by \"%s\"\n"
+						 "but was not the same version as %s.\n"
+						 "Check your installation.",
+						 "pg_ctl", full_path, progname);
+		return false;
+	}
+
+	pg_log_debug("pg_ctl path is: %s", pg_ctl_path);
+
+	pg_resetwal_path = pg_malloc(MAXPGPATH);
+	rc = find_other_exec(path, "pg_resetwal",
+						 "pg_resetwal (PostgreSQL) " PG_VERSION "\n",
+						 pg_resetwal_path);
+	if (rc < 0)
+	{
+		char		full_path[MAXPGPATH];
+
+		if (find_my_exec(path, full_path) < 0)
+			strlcpy(full_path, progname, sizeof(full_path));
+		if (rc == -1)
+			pg_log_error("The program \"%s\" is needed by %s but was not found in the\n"
+						 "same directory as \"%s\".\n"
+						 "Check your installation.",
+						 "pg_resetwal", progname, full_path);
+		else
+			pg_log_error("The program \"%s\" was found by \"%s\"\n"
+						 "but was not the same version as %s.\n"
+						 "Check your installation.",
+						 "pg_resetwal", full_path, progname);
+		return false;
+	}
+
+	pg_log_debug("pg_resetwal path is: %s", pg_resetwal_path);
+
+	return true;
+}
+
+/*
+ * Is it a cluster directory? These are preliminary checks. It is far from
+ * making an accurate check. If it is not a clone from the publisher, it will
+ * eventually fail in a future step.
+ */
+static bool
+check_data_directory(const char *datadir)
+{
+	struct stat statbuf;
+	char		versionfile[MAXPGPATH];
+
+	pg_log_info("checking if directory \"%s\" is a cluster data directory",
+				datadir);
+
+	if (stat(datadir, &statbuf) != 0)
+	{
+		if (errno == ENOENT)
+			pg_log_error("data directory \"%s\" does not exist", datadir);
+		else
+			pg_log_error("could not access directory \"%s\": %s", datadir, strerror(errno));
+
+		return false;
+	}
+
+	snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
+	if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
+	{
+		pg_log_error("directory \"%s\" is not a database cluster directory", datadir);
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Append database name into a base connection string.
+ *
+ * dbname is the only parameter that changes so it is not included in the base
+ * connection string. This function concatenates dbname to build a "real"
+ * connection string.
+ */
+static char *
+concat_conninfo_dbname(const char *conninfo, const char *dbname)
+{
+	PQExpBuffer buf = createPQExpBuffer();
+	char	   *ret;
+
+	Assert(conninfo != NULL);
+
+	appendPQExpBufferStr(buf, conninfo);
+	appendPQExpBuffer(buf, " dbname=%s", dbname);
+
+	ret = pg_strdup(buf->data);
+	destroyPQExpBuffer(buf);
+
+	return ret;
+}
+
+/*
+ * Store publication and subscription information.
+ */
+static LogicalRepInfo *
+store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo)
+{
+	LogicalRepInfo *dbinfo;
+	SimpleStringListCell *cell;
+	int			i = 0;
+
+	dbinfo = (LogicalRepInfo *) pg_malloc(num_dbs * sizeof(LogicalRepInfo));
+
+	for (cell = database_names.head; cell; cell = cell->next)
+	{
+		char	   *conninfo;
+
+		/* Publisher. */
+		conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
+		dbinfo[i].pubconninfo = conninfo;
+		dbinfo[i].dbname = cell->val;
+		dbinfo[i].made_replslot = false;
+		dbinfo[i].made_publication = false;
+		dbinfo[i].made_subscription = false;
+		/* other struct fields will be filled later. */
+
+		/* Subscriber. */
+		conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
+		dbinfo[i].subconninfo = conninfo;
+
+		i++;
+	}
+
+	return dbinfo;
+}
+
+static PGconn *
+connect_database(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	const char *rconninfo;
+
+	/* logical replication mode */
+	rconninfo = psprintf("%s replication=database", conninfo);
+
+	conn = PQconnectdb(rconninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		pg_log_error("connection to database failed: %s", PQerrorMessage(conn));
+		return NULL;
+	}
+
+	/* secure search_path */
+	res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not clear search_path: %s", PQresultErrorMessage(res));
+		return NULL;
+	}
+	PQclear(res);
+
+	return conn;
+}
+
+static void
+disconnect_database(PGconn *conn)
+{
+	Assert(conn != NULL);
+
+	PQfinish(conn);
+}
+
+/*
+ * Obtain the system identifier using the provided connection. It will be used
+ * to compare if a data directory is a clone of another one.
+ */
+static uint64
+get_sysid_from_conn(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	uint64		sysid;
+
+	pg_log_info("getting system identifier from publisher");
+
+	conn = connect_database(conninfo);
+	if (conn == NULL)
+		exit(1);
+
+	res = PQexec(conn, "IDENTIFY_SYSTEM");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not send replication command \"%s\": %s",
+					 "IDENTIFY_SYSTEM", PQresultErrorMessage(res));
+		PQclear(res);
+		disconnect_database(conn);
+		exit(1);
+	}
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
+	{
+		pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 3);
+
+		PQclear(res);
+		disconnect_database(conn);
+		exit(1);
+	}
+
+	sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
+
+	pg_log_info("system identifier is %llu on publisher", (unsigned long long) sysid);
+
+	disconnect_database(conn);
+
+	return sysid;
+}
+
+/*
+ * Obtain the system identifier from control file. It will be used to compare
+ * if a data directory is a clone of another one. This routine is used locally
+ * and avoids a replication connection.
+ */
+static uint64
+get_control_from_datadir(const char *datadir)
+{
+	ControlFileData *cf;
+	bool		crc_ok;
+	uint64		sysid;
+
+	pg_log_info("getting system identifier from subscriber");
+
+	cf = get_controlfile(datadir, &crc_ok);
+	if (!crc_ok)
+	{
+		pg_log_error("control file appears to be corrupt");
+		exit(1);
+	}
+
+	sysid = cf->system_identifier;
+
+	pg_log_info("system identifier is %llu on subscriber", (unsigned long long) sysid);
+
+	pfree(cf);
+
+	return sysid;
+}
+
+/*
+ * Modify the system identifier. Since a standby server preserves the system
+ * identifier, it makes sense to change it to avoid situations in which WAL
+ * files from one of the systems might be used in the other one.
+ */
+static void
+modify_sysid(const char *pg_resetwal_path, const char *datadir)
+{
+	ControlFileData *cf;
+	bool		crc_ok;
+	struct timeval tv;
+
+	char	   *cmd_str;
+	int			rc;
+
+	pg_log_info("modifying system identifier from subscriber");
+
+	cf = get_controlfile(datadir, &crc_ok);
+	if (!crc_ok)
+	{
+		pg_log_error("control file appears to be corrupt");
+		exit(1);
+	}
+
+	/*
+	 * Select a new system identifier.
+	 *
+	 * XXX this code was extracted from BootStrapXLOG().
+	 */
+	gettimeofday(&tv, NULL);
+	cf->system_identifier = ((uint64) tv.tv_sec) << 32;
+	cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
+	cf->system_identifier |= getpid() & 0xFFF;
+
+	if (!dry_run)
+		update_controlfile(datadir, cf, true);
+
+	pg_log_info("system identifier is %llu on subscriber", (unsigned long long) cf->system_identifier);
+
+	pg_log_info("running pg_resetwal on the subscriber");
+
+	cmd_str = psprintf("\"%s\" -D \"%s\"", pg_resetwal_path, datadir);
+
+	pg_log_debug("command is: %s", cmd_str);
+
+	if (!dry_run)
+	{
+		rc = system(cmd_str);
+		if (rc == 0)
+			pg_log_info("subscriber successfully changed the system identifier");
+		else
+			pg_log_error("subscriber failed to change system identifier: exit code: %d", rc);
+	}
+
+	pfree(cf);
+}
+
+/*
+ * Is the source server ready for logical replication? If so, create the
+ * publications and replication slots in preparation for logical replication.
+ */
+static bool
+setup_publisher(LogicalRepInfo *dbinfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	PQExpBuffer str = createPQExpBuffer();
+
+	char	   *wal_level;
+	int			max_repslots;
+	int			cur_repslots;
+	int			max_walsenders;
+	int			cur_walsenders;
+
+	pg_log_info("checking settings on publisher");
+
+	/*
+	 * Logical replication requires a few parameters to be set on publisher.
+	 * Since these parameters are not a requirement for physical replication,
+	 * we should check it to make sure it won't fail.
+	 *
+	 * wal_level = logical
+	 * max_replication_slots >= current + number of dbs to be converted
+	 * max_wal_senders >= current + number of dbs to be converted
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo);
+	if (conn == NULL)
+		exit(1);
+
+	res = PQexec(conn,
+				 "WITH wl AS (SELECT setting AS wallevel FROM pg_settings WHERE name = 'wal_level'),"
+				 "     total_mrs AS (SELECT setting AS tmrs FROM pg_settings WHERE name = 'max_replication_slots'),"
+				 "     cur_mrs AS (SELECT count(*) AS cmrs FROM pg_replication_slots),"
+				 "     total_mws AS (SELECT setting AS tmws FROM pg_settings WHERE name = 'max_wal_senders'),"
+				 "     cur_mws AS (SELECT count(*) AS cmws FROM pg_stat_activity WHERE backend_type = 'walsender')"
+				 "SELECT wallevel, tmrs, cmrs, tmws, cmws FROM wl, total_mrs, cur_mrs, total_mws, cur_mws");
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publisher settings: %s", PQresultErrorMessage(res));
+		return false;
+	}
+
+	wal_level = strdup(PQgetvalue(res, 0, 0));
+	max_repslots = atoi(PQgetvalue(res, 0, 1));
+	cur_repslots = atoi(PQgetvalue(res, 0, 2));
+	max_walsenders = atoi(PQgetvalue(res, 0, 3));
+	cur_walsenders = atoi(PQgetvalue(res, 0, 4));
+
+	PQclear(res);
+
+	pg_log_debug("subscriber: wal_level: %s", wal_level);
+	pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
+	pg_log_debug("subscriber: current replication slots: %d", cur_repslots);
+	pg_log_debug("subscriber: max_wal_senders: %d", max_walsenders);
+	pg_log_debug("subscriber: current wal senders: %d", cur_walsenders);
+
+	/*
+	 * If standby sets primary_slot_name, check if this replication slot is in
+	 * use on primary for WAL retention purposes. This replication slot has no
+	 * use after the transformation, hence, it will be removed at the end of
+	 * this process.
+	 */
+	if (primary_slot_name)
+	{
+		appendPQExpBuffer(str,
+						  "SELECT 1 FROM pg_replication_slots WHERE active AND slot_name = '%s'", primary_slot_name);
+
+		pg_log_debug("command is: %s", str->data);
+
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain replication slot information: %s", PQresultErrorMessage(res));
+			return false;
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("could not obtain replication slot information: got %d rows, expected %d row",
+						 PQntuples(res), 1);
+			pg_free(primary_slot_name); /* it is not being used. */
+			primary_slot_name = NULL;
+			return false;
+		}
+		else
+		{
+			pg_log_info("primary has replication slot \"%s\"", primary_slot_name);
+		}
+
+		PQclear(res);
+	}
+
+	disconnect_database(conn);
+
+	if (strcmp(wal_level, "logical") != 0)
+	{
+		pg_log_error("publisher requires wal_level >= logical");
+		return false;
+	}
+
+	if (max_repslots - cur_repslots < num_dbs)
+	{
+		pg_log_error("publisher requires %d replication slots, but only %d remain", num_dbs, max_repslots - cur_repslots);
+		pg_log_error_hint("Consider increasing max_replication_slots to at least %d.", cur_repslots + num_dbs);
+		return false;
+	}
+
+	if (max_walsenders - cur_walsenders < num_dbs)
+	{
+		pg_log_error("publisher requires %d wal sender processes, but only %d remain", num_dbs, max_walsenders - cur_walsenders);
+		pg_log_error_hint("Consider increasing max_wal_senders to at least %d.", cur_walsenders + num_dbs);
+		return false;
+	}
+
+	for (int i = 0; i < num_dbs; i++)
+	{
+		char		pubname[NAMEDATALEN];
+		char		replslotname[NAMEDATALEN];
+
+		conn = connect_database(dbinfo[i].pubconninfo);
+		if (conn == NULL)
+			exit(1);
+
+		res = PQexec(conn,
+					 "SELECT oid FROM pg_catalog.pg_database WHERE datname = current_database()");
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain database OID: %s", PQresultErrorMessage(res));
+			return false;
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("could not obtain database OID: got %d rows, expected %d rows",
+						 PQntuples(res), 1);
+			return false;
+		}
+
+		/* Remember database OID. */
+		dbinfo[i].oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+
+		PQclear(res);
+
+		/*
+		 * Build the publication name. The name must not exceed NAMEDATALEN -
+		 * 1. This current schema uses a maximum of 35 characters (14 + 10 +
+		 * '\0').
+		 */
+		snprintf(pubname, sizeof(pubname), "pg_subscriber_%u", dbinfo[i].oid);
+		dbinfo[i].pubname = pg_strdup(pubname);
+
+		/*
+		 * Create publication on publisher. This step should be executed
+		 * *before* promoting the subscriber to avoid any transactions between
+		 * consistent LSN and the new publication rows (such transactions
+		 * wouldn't see the new publication rows resulting in an error).
+		 */
+		create_publication(conn, &dbinfo[i]);
+
+		/*
+		 * Build the replication slot name. The name must not exceed
+		 * NAMEDATALEN - 1. This current schema uses a maximum of 36
+		 * characters (14 + 10 + 1 + 10 + '\0'). System identifier is included
+		 * to reduce the probability of collision. By default, subscription
+		 * name is used as replication slot name.
+		 */
+		snprintf(replslotname, sizeof(replslotname),
+				 "pg_subscriber_%u_%d",
+				 dbinfo[i].oid,
+				 (int) getpid());
+		dbinfo[i].subname = pg_strdup(replslotname);
+
+		/* Create replication slot on publisher. */
+		if (create_logical_replication_slot(conn, &dbinfo[i], replslotname) != NULL || dry_run)
+			pg_log_info("create replication slot \"%s\" on publisher", replslotname);
+		else
+			return false;
+
+		disconnect_database(conn);
+	}
+
+	return true;
+}
+
+/*
+ * Is the target server ready for logical replication?
+ */
+static bool
+setup_subscriber(LogicalRepInfo *dbinfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+
+	int			max_lrworkers;
+	int			max_repslots;
+	int			max_wprocs;
+
+	pg_log_info("checking settings on subscriber");
+
+	/*
+	 * Logical replication requires a few parameters to be set on subscriber.
+	 * Since these parameters are not a requirement for physical replication,
+	 * we should check it to make sure it won't fail.
+	 *
+	 * max_replication_slots >= number of dbs to be converted
+	 * max_logical_replication_workers >= number of dbs to be converted
+	 * max_worker_processes >= 1 + number of dbs to be converted
+	 */
+	conn = connect_database(dbinfo[0].subconninfo);
+	if (conn == NULL)
+		exit(1);
+
+	res = PQexec(conn,
+				 "SELECT setting FROM pg_settings WHERE name IN ('max_logical_replication_workers', 'max_replication_slots', 'max_worker_processes', 'primary_slot_name') ORDER BY name");
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain subscriber settings: %s", PQresultErrorMessage(res));
+		return false;
+	}
+
+	max_lrworkers = atoi(PQgetvalue(res, 0, 0));
+	max_repslots = atoi(PQgetvalue(res, 1, 0));
+	max_wprocs = atoi(PQgetvalue(res, 2, 0));
+	if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
+		primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
+
+	pg_log_debug("subscriber: max_logical_replication_workers: %d", max_lrworkers);
+	pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
+	pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
+	pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
+
+	PQclear(res);
+
+	disconnect_database(conn);
+
+	if (max_repslots < num_dbs)
+	{
+		pg_log_error("subscriber requires %d replication slots, but only %d remain", num_dbs, max_repslots);
+		pg_log_error_hint("Consider increasing max_replication_slots to at least %d.", num_dbs);
+		return false;
+	}
+
+	if (max_lrworkers < num_dbs)
+	{
+		pg_log_error("subscriber requires %d logical replication workers, but only %d remain", num_dbs, max_lrworkers);
+		pg_log_error_hint("Consider increasing max_logical_replication_workers to at least %d.", num_dbs);
+		return false;
+	}
+
+	if (max_wprocs < num_dbs + 1)
+	{
+		pg_log_error("subscriber requires %d worker processes, but only %d remain", num_dbs + 1, max_wprocs);
+		pg_log_error_hint("Consider increasing max_worker_processes to at least %d.", num_dbs + 1);
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Create a logical replication slot and returns a consistent LSN. The returned
+ * LSN might be used to catch up the subscriber up to the required point.
+ *
+ * CreateReplicationSlot() is not used because it does not provide the one-row
+ * result set that contains the consistent LSN.
+ */
+static char *
+create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo,
+								char *slot_name)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res = NULL;
+	char	   *lsn = NULL;
+	bool		transient_replslot = false;
+
+	Assert(conn != NULL);
+
+	/*
+	 * If no slot name is informed, it is a transient replication slot used
+	 * only for catch up purposes.
+	 */
+	if (slot_name[0] == '\0')
+	{
+		snprintf(slot_name, NAMEDATALEN, "pg_subscriber_%d_startpoint",
+				 (int) getpid());
+		transient_replslot = true;
+	}
+
+	pg_log_info("creating the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
+	appendPQExpBufferStr(str, " LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname,
+						 PQresultErrorMessage(res));
+			return lsn;
+		}
+	}
+
+	/* for cleanup purposes */
+	if (transient_replslot)
+		made_transient_replslot = true;
+	else
+		dbinfo->made_replslot = true;
+
+	if (!dry_run)
+	{
+		lsn = pg_strdup(PQgetvalue(res, 0, 1));
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+
+	return lsn;
+}
+
+static void
+drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("dropping the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP_REPLICATION_SLOT \"%s\"", slot_name);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname,
+						 PQerrorMessage(conn));
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Reports a suitable message if pg_ctl fails.
+ */
+static void
+pg_ctl_status(const char *pg_ctl_cmd, int rc, int action)
+{
+	if (rc != 0)
+	{
+		if (WIFEXITED(rc))
+		{
+			pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
+		}
+		else if (WIFSIGNALED(rc))
+		{
+#if defined(WIN32)
+			pg_log_error("pg_ctl was terminated by exception 0x%X", WTERMSIG(rc));
+			pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
+#else
+			pg_log_error("pg_ctl was terminated by signal %d: %s",
+						 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
+#endif
+		}
+		else
+		{
+			pg_log_error("pg_ctl exited with unrecognized status %d", rc);
+		}
+
+		pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
+		exit(1);
+	}
+
+	if (action)
+		pg_log_info("postmaster was started");
+	else
+		pg_log_info("postmaster was stopped");
+}
+
+/*
+ * Returns after the server finishes the recovery process.
+ *
+ * If recovery_timeout option is set, terminate abnormally without finishing
+ * the recovery process. By default, it waits forever.
+ */
+static void
+wait_for_end_recovery(const char *conninfo)
+{
+	PGconn	   *conn;
+	PGresult   *res;
+	int			status = POSTMASTER_STILL_STARTING;
+	int			timer = 0;
+
+	char	   *pg_ctl_cmd;
+	int			rc;
+
+	pg_log_info("waiting the postmaster to reach the consistent state");
+
+	conn = connect_database(conninfo);
+	if (conn == NULL)
+		exit(1);
+
+	for (;;)
+	{
+		bool		in_recovery;
+
+		res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not obtain recovery progress");
+			exit(1);
+		}
+
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("unexpected result from pg_is_in_recovery function");
+			exit(1);
+		}
+
+		in_recovery = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
+
+		PQclear(res);
+
+		/*
+		 * Does the recovery process finish? In dry run mode, there is no
+		 * recovery mode. Bail out as the recovery process has ended.
+		 */
+		if (!in_recovery || dry_run)
+		{
+			status = POSTMASTER_READY;
+			break;
+		}
+
+		/*
+		 * Bail out after recovery_timeout seconds if this option is set.
+		 */
+		if (recovery_timeout > 0 && timer >= recovery_timeout)
+		{
+			pg_log_error("recovery timed out");
+
+			pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+			rc = system(pg_ctl_cmd);
+			pg_ctl_status(pg_ctl_cmd, rc, 0);
+
+			exit(1);
+		}
+
+		/* Keep waiting. */
+		pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
+
+		timer += WAIT_INTERVAL;
+	}
+
+	disconnect_database(conn);
+
+	if (status == POSTMASTER_STILL_STARTING)
+	{
+		pg_log_error("server did not end recovery");
+		exit(1);
+	}
+
+	pg_log_info("postmaster reached the consistent state");
+}
+
+/*
+ * Create a publication that includes all tables in the database.
+ */
+static void
+create_publication(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	/* Check if the publication needs to be created. */
+	appendPQExpBuffer(str,
+					  "SELECT puballtables FROM pg_catalog.pg_publication WHERE pubname = '%s'",
+					  dbinfo->pubname);
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publication information: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (PQntuples(res) == 1)
+	{
+		/*
+		 * If publication name already exists and puballtables is true, let's
+		 * use it. A previous run of pg_subscriber must have created this
+		 * publication. Bail out.
+		 */
+		if (strcmp(PQgetvalue(res, 0, 0), "t") == 0)
+		{
+			pg_log_info("publication \"%s\" already exists", dbinfo->pubname);
+			return;
+		}
+		else
+		{
+			/*
+			 * Unfortunately, if it reaches this code path, it will always
+			 * fail (unless you decide to change the existing publication
+			 * name). That's bad but it is very unlikely that the user will
+			 * choose a name with pg_subscriber_ prefix followed by the exact
+			 * database oid in which puballtables is false.
+			 */
+			pg_log_error("publication \"%s\" does not replicate changes for all tables",
+						 dbinfo->pubname);
+			pg_log_error_hint("Consider renaming this publication.");
+			PQclear(res);
+			PQfinish(conn);
+			exit(1);
+		}
+	}
+
+	PQclear(res);
+	resetPQExpBuffer(str);
+
+	pg_log_info("creating publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", dbinfo->pubname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not create publication \"%s\" on database \"%s\": %s",
+						 dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn));
+			PQfinish(conn);
+			exit(1);
+		}
+	}
+
+	/* for cleanup purposes */
+	dbinfo->made_publication = true;
+
+	if (!dry_run)
+		PQclear(res);
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove publication if it couldn't finish all steps.
+ */
+static void
+drop_publication(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("dropping publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP PUBLICATION %s", dbinfo->pubname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pg_log_error("could not drop publication \"%s\" on database \"%s\": %s", dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn));
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Create a subscription with some predefined options.
+ *
+ * A replication slot was already created in a previous step. Let's use it. By
+ * default, the subscription name is used as replication slot name. It is
+ * not required to copy data. The subscription will be created but it will not
+ * be enabled now. That's because the replication progress must be set and the
+ * replication origin name (one of the function arguments) contains the
+ * subscription OID in its name. Once the subscription is created,
+ * set_replication_progress() can obtain the chosen origin name and set up its
+ * initial location.
+ */
+static void
+create_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("creating subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str,
+					  "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s "
+					  "WITH (create_slot = false, copy_data = false, enabled = false)",
+					  dbinfo->subname, dbinfo->pubconninfo, dbinfo->pubname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not create subscription \"%s\" on database \"%s\": %s",
+						 dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn));
+			PQfinish(conn);
+			exit(1);
+		}
+	}
+
+	/* for cleanup purposes */
+	dbinfo->made_subscription = true;
+
+	if (!dry_run)
+		PQclear(res);
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Remove subscription if it couldn't finish all steps.
+ */
+static void
+drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("dropping subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "DROP SUBSCRIPTION %s", dbinfo->subname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pg_log_error("could not drop subscription \"%s\" on database \"%s\": %s", dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn));
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Sets the replication progress to the consistent LSN.
+ *
+ * The subscriber caught up to the consistent LSN provided by the temporary
+ * replication slot. The goal is to set up the initial location for the logical
+ * replication that is the exact LSN that the subscriber was promoted. Once the
+ * subscription is enabled it will start streaming from that location onwards.
+ * In dry run mode, the subscription OID and LSN are set to invalid values for
+ * printing purposes.
+ */
+static void
+set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+	Oid			suboid;
+	char		originname[NAMEDATALEN];
+	char		lsnstr[17 + 1]; /* MAXPG_LSNLEN = 17 */
+
+	Assert(conn != NULL);
+
+	appendPQExpBuffer(str,
+					  "SELECT oid FROM pg_catalog.pg_subscription WHERE subname = '%s'", dbinfo->subname);
+
+	res = PQexec(conn, str->data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain subscription OID: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (PQntuples(res) != 1 && !dry_run)
+	{
+		pg_log_error("could not obtain subscription OID: got %d rows, expected %d rows",
+					 PQntuples(res), 1);
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	if (dry_run)
+	{
+		suboid = InvalidOid;
+		snprintf(lsnstr, sizeof(lsnstr), "%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
+	}
+	else
+	{
+		suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+		snprintf(lsnstr, sizeof(lsnstr), "%s", lsn);
+	}
+
+	/*
+	 * The origin name is defined as pg_%u. %u is the subscription OID. See
+	 * ApplyWorkerMain().
+	 */
+	snprintf(originname, sizeof(originname), "pg_%u", suboid);
+
+	PQclear(res);
+
+	pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
+				originname, lsnstr, dbinfo->dbname);
+
+	resetPQExpBuffer(str);
+	appendPQExpBuffer(str,
+					  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')", originname, lsnstr);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not set replication progress for the subscription \"%s\": %s",
+						 dbinfo->subname, PQresultErrorMessage(res));
+			PQfinish(conn);
+			exit(1);
+		}
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+/*
+ * Enables the subscription.
+ *
+ * The subscription was created in a previous step but it was disabled. After
+ * adjusting the initial location, enabling the subscription is the last step
+ * of this setup.
+ */
+static void
+enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer str = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	pg_log_info("enabling subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname);
+
+	appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", dbinfo->subname);
+
+	pg_log_debug("command is: %s", str->data);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not enable subscription \"%s\": %s", dbinfo->subname,
+						 PQerrorMessage(conn));
+			PQfinish(conn);
+			exit(1);
+		}
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(str);
+}
+
+int
+main(int argc, char **argv)
+{
+	static struct option long_options[] =
+	{
+		{"help", no_argument, NULL, '?'},
+		{"version", no_argument, NULL, 'V'},
+		{"pgdata", required_argument, NULL, 'D'},
+		{"publisher-conninfo", required_argument, NULL, 'P'},
+		{"subscriber-conninfo", required_argument, NULL, 'S'},
+		{"database", required_argument, NULL, 'd'},
+		{"dry-run", no_argument, NULL, 'n'},
+		{"recovery-timeout", required_argument, NULL, 't'},
+		{"retain", no_argument, NULL, 'r'},
+		{"verbose", no_argument, NULL, 'v'},
+		{NULL, 0, NULL, 0}
+	};
+
+	int			c;
+	int			option_index;
+	int			rc;
+
+	char	   *pg_ctl_cmd;
+
+	char	   *base_dir;
+	char	   *server_start_log;
+
+	char		timebuf[128];
+	struct timeval time;
+	time_t		tt;
+	int			len;
+
+	char	   *pub_base_conninfo = NULL;
+	char	   *sub_base_conninfo = NULL;
+	char	   *dbname_conninfo = NULL;
+
+	uint64		pub_sysid;
+	uint64		sub_sysid;
+	struct stat statbuf;
+
+	PGconn	   *conn;
+	char	   *consistent_lsn;
+
+	PQExpBuffer recoveryconfcontents = NULL;
+
+	char		pidfile[MAXPGPATH];
+
+	int			i;
+
+	pg_logging_init(argv[0]);
+	pg_logging_set_level(PG_LOG_WARNING);
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_subscriber"));
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		else if (strcmp(argv[1], "-V") == 0
+				 || strcmp(argv[1], "--version") == 0)
+		{
+			puts("pg_subscriber (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	atexit(cleanup_objects_atexit);
+
+	/*
+	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
+	 * it either.
+	 */
+#ifndef WIN32
+	if (geteuid() == 0)
+	{
+		pg_log_error("cannot be executed by \"root\"");
+		pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
+						  progname);
+		exit(1);
+	}
+#endif
+
+	while ((c = getopt_long(argc, argv, "D:P:S:d:nv",
+							long_options, &option_index)) != -1)
+	{
+		switch (c)
+		{
+			case 'D':
+				subscriber_dir = pg_strdup(optarg);
+				break;
+			case 'P':
+				pub_conninfo_str = pg_strdup(optarg);
+				break;
+			case 'S':
+				sub_conninfo_str = pg_strdup(optarg);
+				break;
+			case 'd':
+				/* Ignore duplicated database names. */
+				if (!simple_string_list_member(&database_names, optarg))
+				{
+					simple_string_list_append(&database_names, optarg);
+					num_dbs++;
+				}
+				break;
+			case 'n':
+				dry_run = true;
+				break;
+			case 'r':
+				retain = true;
+				break;
+			case 't':
+				recovery_timeout = atoi(optarg);
+				break;
+			case 'v':
+				pg_logging_increase_verbosity();
+				break;
+			default:
+				/* getopt_long already emitted a complaint */
+				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+				exit(1);
+		}
+	}
+
+	/*
+	 * Any non-option arguments?
+	 */
+	if (optind < argc)
+	{
+		pg_log_error("too many command-line arguments (first is \"%s\")",
+					 argv[optind]);
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+
+	/*
+	 * Required arguments
+	 */
+	if (subscriber_dir == NULL)
+	{
+		pg_log_error("no subscriber data directory specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+
+	/*
+	 * Parse connection string. Build a base connection string that might be
+	 * reused by multiple databases.
+	 */
+	if (pub_conninfo_str == NULL)
+	{
+		/*
+		 * TODO use primary_conninfo (if available) from subscriber and
+		 * extract publisher connection string. Assume that there are
+		 * identical entries for physical and logical replication. If there is
+		 * not, we would fail anyway.
+		 */
+		pg_log_error("no publisher connection string specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+	pub_base_conninfo = get_base_conninfo(pub_conninfo_str, dbname_conninfo,
+										  "publisher");
+	if (pub_base_conninfo == NULL)
+		exit(1);
+
+	if (sub_conninfo_str == NULL)
+	{
+		pg_log_error("no subscriber connection string specified");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit(1);
+	}
+	sub_base_conninfo = get_base_conninfo(sub_conninfo_str, NULL, "subscriber");
+	if (sub_base_conninfo == NULL)
+		exit(1);
+
+	if (database_names.head == NULL)
+	{
+		pg_log_info("no database was specified");
+
+		/*
+		 * If --database option is not provided, try to obtain the dbname from
+		 * the publisher conninfo. If dbname parameter is not available, error
+		 * out.
+		 */
+		if (dbname_conninfo)
+		{
+			simple_string_list_append(&database_names, dbname_conninfo);
+			num_dbs++;
+
+			pg_log_info("database \"%s\" was extracted from the publisher connection string",
+						dbname_conninfo);
+		}
+		else
+		{
+			pg_log_error("no database name specified");
+			pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+			exit(1);
+		}
+	}
+
+	/*
+	 * Get the absolute path of pg_ctl and pg_resetwal on the subscriber.
+	 */
+	if (!get_exec_path(argv[0]))
+		exit(1);
+
+	/* rudimentary check for a data directory. */
+	if (!check_data_directory(subscriber_dir))
+		exit(1);
+
+	/* Store database information for publisher and subscriber. */
+	dbinfo = store_pub_sub_info(pub_base_conninfo, sub_base_conninfo);
+
+	/*
+	 * Check if the subscriber data directory has the same system identifier
+	 * than the publisher data directory.
+	 */
+	pub_sysid = get_sysid_from_conn(dbinfo[0].pubconninfo);
+	sub_sysid = get_control_from_datadir(subscriber_dir);
+	if (pub_sysid != sub_sysid)
+	{
+		pg_log_error("subscriber data directory is not a copy of the source database cluster");
+		exit(1);
+	}
+
+	/*
+	 * Create the output directory to store any data generated by this tool.
+	 */
+	base_dir = (char *) pg_malloc0(MAXPGPATH);
+	len = snprintf(base_dir, MAXPGPATH, "%s/%s", subscriber_dir, PGS_OUTPUT_DIR);
+	if (len >= MAXPGPATH)
+	{
+		pg_log_error("directory path for subscriber is too long");
+		exit(1);
+	}
+
+	if (mkdir(base_dir, pg_dir_create_mode) < 0 && errno != EEXIST)
+	{
+		pg_log_error("could not create directory \"%s\": %m", base_dir);
+		exit(1);
+	}
+
+	/* subscriber PID file. */
+	snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
+
+	/*
+	 * The standby server must be running. That's because some checks will be
+	 * done (is it ready for a logical replication setup?). After that, stop
+	 * the subscriber in preparation to modify some recovery parameters that
+	 * require a restart.
+	 */
+	if (stat(pidfile, &statbuf) == 0)
+	{
+		/*
+		 * Check if the standby server is ready for logical replication.
+		 */
+		if (!setup_subscriber(dbinfo))
+			exit(1);
+
+		/*
+		 * Check if the primary server is ready for logical replication and
+		 * create the required objects for each database on publisher. This
+		 * step is here mainly because if we stop the standby we cannot verify
+		 * if the primary slot is in use. We could use an extra connection for
+		 * it but it doesn't seem worth.
+		 */
+		if (!setup_publisher(dbinfo))
+			exit(1);
+
+		pg_log_info("standby is up and running");
+		pg_log_info("stopping the server to start the transformation steps");
+
+		pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+		rc = system(pg_ctl_cmd);
+		pg_ctl_status(pg_ctl_cmd, rc, 0);
+	}
+	else
+	{
+		pg_log_error("standby is not running");
+		pg_log_error_hint("Start the standby and try again.");
+		exit(1);
+	}
+
+	/*
+	 * Create a logical replication slot to get a consistent LSN.
+	 *
+	 * This consistent LSN will be used later to advanced the recently created
+	 * replication slots. We cannot use the last created replication slot
+	 * because the consistent LSN should be obtained *after* the base backup
+	 * finishes (and the base backup should include the logical replication
+	 * slots).
+	 *
+	 * XXX we should probably use the last created replication slot to get a
+	 * consistent LSN but it should be changed after adding pg_basebackup
+	 * support.
+	 *
+	 * A temporary replication slot is not used here to avoid keeping a
+	 * replication connection open (depending when base backup was taken, the
+	 * connection should be open for a few hours).
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo);
+	if (conn == NULL)
+		exit(1);
+	consistent_lsn = create_logical_replication_slot(conn, &dbinfo[0],
+													 temp_replslot);
+
+	/*
+	 * Write recovery parameters.
+	 *
+	 * Despite of the recovery parameters will be written to the subscriber,
+	 * use a publisher connection for the follwing recovery functions. The
+	 * connection is only used to check the current server version (physical
+	 * replica, same server version). The subscriber is not running yet. In
+	 * dry run mode, the recovery parameters *won't* be written. An invalid
+	 * LSN is used for printing purposes.
+	 */
+	recoveryconfcontents = GenerateRecoveryConfig(conn, NULL);
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_inclusive = true\n");
+	appendPQExpBuffer(recoveryconfcontents, "recovery_target_action = promote\n");
+
+	if (dry_run)
+	{
+		appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
+		appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%X/%X'\n",
+						  LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
+	}
+	else
+	{
+		appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
+						  consistent_lsn);
+		WriteRecoveryConfig(conn, subscriber_dir, recoveryconfcontents);
+	}
+	disconnect_database(conn);
+
+	pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
+
+	/*
+	 * Start subscriber and wait until accepting connections.
+	 */
+	pg_log_info("starting the subscriber");
+
+	/* append timestamp with ISO 8601 format. */
+	gettimeofday(&time, NULL);
+	tt = (time_t) time.tv_sec;
+	strftime(timebuf, sizeof(timebuf), "%Y%m%dT%H%M%S", localtime(&tt));
+	snprintf(timebuf + strlen(timebuf), sizeof(timebuf) - strlen(timebuf),
+			 ".%03d", (int) (time.tv_usec / 1000));
+
+	server_start_log = (char *) pg_malloc0(MAXPGPATH);
+	len = snprintf(server_start_log, MAXPGPATH, "%s/%s/server_start_%s.log", subscriber_dir, PGS_OUTPUT_DIR, timebuf);
+	if (len >= MAXPGPATH)
+	{
+		pg_log_error("log file path is too long");
+		exit(1);
+	}
+
+	pg_ctl_cmd = psprintf("\"%s\" start -D \"%s\" -s -l \"%s\"", pg_ctl_path, subscriber_dir, server_start_log);
+	rc = system(pg_ctl_cmd);
+	pg_ctl_status(pg_ctl_cmd, rc, 1);
+
+	/*
+	 * Waiting the subscriber to be promoted.
+	 */
+	wait_for_end_recovery(dbinfo[0].subconninfo);
+
+	/*
+	 * Create a subscription for each database.
+	 */
+	for (i = 0; i < num_dbs; i++)
+	{
+		/* Connect to subscriber. */
+		conn = connect_database(dbinfo[i].subconninfo);
+		if (conn == NULL)
+			exit(1);
+
+		create_subscription(conn, &dbinfo[i]);
+
+		/* Set the replication progress to the correct LSN. */
+		set_replication_progress(conn, &dbinfo[i], consistent_lsn);
+
+		/* Enable subscription. */
+		enable_subscription(conn, &dbinfo[i]);
+
+		disconnect_database(conn);
+	}
+
+	/*
+	 * The transient replication slot is no longer required. Drop it.
+	 *
+	 * If the physical replication slot exists, drop it.
+	 *
+	 * XXX we might not fail here. Instead, we provide a warning so the user
+	 * eventually drops the replication slot later.
+	 */
+	conn = connect_database(dbinfo[0].pubconninfo);
+	if (conn == NULL)
+	{
+		if (primary_slot_name != NULL)
+			pg_log_warning("could not drop replication slot \"%s\" on primary", primary_slot_name);
+		pg_log_warning("could not drop transient replication slot \"%s\" on publisher", temp_replslot);
+		pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+	}
+	else
+	{
+		drop_replication_slot(conn, &dbinfo[0], temp_replslot);
+		if (primary_slot_name != NULL)
+			drop_replication_slot(conn, &dbinfo[0], primary_slot_name);
+		disconnect_database(conn);
+	}
+
+	/*
+	 * Stop the subscriber.
+	 */
+	pg_log_info("stopping the subscriber");
+
+	pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir);
+	rc = system(pg_ctl_cmd);
+	pg_ctl_status(pg_ctl_cmd, rc, 0);
+
+	/*
+	 * Change system identifier.
+	 */
+	modify_sysid(pg_resetwal_path, subscriber_dir);
+
+	/*
+	 * The log file is kept if retain option is specified or this tool does
+	 * not run successfully. Otherwise, log file is removed.
+	 */
+	if (!retain)
+		unlink(server_start_log);
+
+	success = true;
+
+	pg_log_info("Done!");
+
+	return 0;
+}
diff --git a/src/bin/pg_basebackup/t/040_pg_subscriber.pl b/src/bin/pg_basebackup/t/040_pg_subscriber.pl
new file mode 100644
index 0000000000..4ebff76b2d
--- /dev/null
+++ b/src/bin/pg_basebackup/t/040_pg_subscriber.pl
@@ -0,0 +1,44 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+#
+# Test checking options of pg_subscriber.
+#
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+program_help_ok('pg_subscriber');
+program_version_ok('pg_subscriber');
+program_options_handling_ok('pg_subscriber');
+
+my $datadir = PostgreSQL::Test::Utils::tempdir;
+
+command_fails(['pg_subscriber'],
+	'no subscriber data directory specified');
+command_fails(
+	[
+		'pg_subscriber',
+		'--pgdata', $datadir
+	],
+	'no publisher connection string specified');
+command_fails(
+	[
+		'pg_subscriber',
+		'--dry-run',
+		'--pgdata', $datadir,
+		'--publisher-conninfo', 'dbname=postgres'
+	],
+	'no subscriber connection string specified');
+command_fails(
+	[
+		'pg_subscriber',
+		'--verbose',
+		'--pgdata', $datadir,
+		'--publisher-conninfo', 'dbname=postgres',
+		'--subscriber-conninfo', 'dbname=postgres'
+	],
+	'no database name specified');
+
+done_testing();
diff --git a/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl b/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl
new file mode 100644
index 0000000000..fbcd0fc82b
--- /dev/null
+++ b/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+#
+# Test using a standby server as the subscriber.
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_p;
+my $node_f;
+my $node_s;
+my $result;
+
+# Set up node P as primary
+$node_p = PostgreSQL::Test::Cluster->new('node_p');
+$node_p->init(allows_streaming => 'logical');
+$node_p->start;
+
+# Set up node F as about-to-fail node
+# The extra option forces it to initialize a new cluster instead of copying a
+# previously initdb's cluster.
+$node_f = PostgreSQL::Test::Cluster->new('node_f');
+$node_f->init(allows_streaming => 'logical', extra => [ '--no-instructions' ]);
+$node_f->start;
+
+# On node P
+# - create databases
+# - create test tables
+# - insert a row
+$node_p->safe_psql(
+	'postgres', q(
+	CREATE DATABASE pg1;
+	CREATE DATABASE pg2;
+));
+$node_p->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_p->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+
+# Set up node S as standby linking to node P
+$node_p->backup('backup_1');
+$node_s = PostgreSQL::Test::Cluster->new('node_s');
+$node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1);
+$node_s->append_conf('postgresql.conf', 'log_min_messages = debug2');
+$node_s->set_standby_mode();
+$node_s->start;
+
+# Insert another row on node P and wait node S to catch up
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('second row')");
+$node_p->wait_for_replay_catchup($node_s);
+
+# Run pg_subscriber on about-to-fail node F
+command_fails(
+	[
+		'pg_subscriber', '--verbose',
+		'--pgdata', $node_f->data_dir,
+		'--publisher-conninfo', $node_p->connstr('pg1'),
+		'--subscriber-conninfo', $node_f->connstr('pg1'),
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'subscriber data directory is not a copy of the source database cluster');
+
+# dry run mode on node S
+command_ok(
+	[
+		'pg_subscriber', '--verbose', '--dry-run',
+		'--pgdata', $node_s->data_dir,
+		'--publisher-conninfo', $node_p->connstr('pg1'),
+		'--subscriber-conninfo', $node_s->connstr('pg1'),
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'run pg_subscriber --dry-run on node S');
+
+# PID sets to undefined because subscriber was stopped behind the scenes.
+# Start subscriber
+$node_s->{_pid} = undef;
+$node_s->start;
+# Check if node S is still a standby
+is($node_s->safe_psql('postgres', 'SELECT pg_is_in_recovery()'),
+	't', 'standby is in recovery');
+
+# Run pg_subscriber on node S
+command_ok(
+	[
+		'pg_subscriber', '--verbose',
+		'--pgdata', $node_s->data_dir,
+		'--publisher-conninfo', $node_p->connstr('pg1'),
+		'--subscriber-conninfo', $node_s->connstr('pg1'),
+		'--database', 'pg1',
+		'--database', 'pg2'
+	],
+	'run pg_subscriber on node S');
+
+# Insert rows on P
+$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('third row')");
+$node_p->safe_psql('pg2', "INSERT INTO tbl2 VALUES('row 1')");
+
+# PID sets to undefined because subscriber was stopped behind the scenes.
+# Start subscriber
+$node_s->{_pid} = undef;
+$node_s->start;
+
+# Get subscription names
+$result = $node_s->safe_psql(
+	'postgres', qq(
+	SELECT subname FROM pg_subscription WHERE subname ~ '^pg_subscriber_'
+));
+my @subnames = split("\n", $result);
+
+# Wait subscriber to catch up
+$node_s->wait_for_subscription_sync($node_p, $subnames[0]);
+$node_s->wait_for_subscription_sync($node_p, $subnames[1]);
+
+# Check result on database pg1
+$result = $node_s->safe_psql('pg1', 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row),
+	'logical replication works on database pg1');
+
+# Check result on database pg2
+$result = $node_s->safe_psql('pg2', 'SELECT * FROM tbl2');
+is( $result, qq(row 1),
+	'logical replication works on database pg2');
+
+# Different system identifier?
+my $sysid_p = $node_p->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()');
+my $sysid_s = $node_s->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()');
+ok($sysid_p != $sysid_s, 'system identifier was changed');
+
+# clean up
+$node_p->teardown_node;
+$node_s->teardown_node;
+
+done_testing();
-- 
2.30.2

Reply via email to