Here is an updated patch to fix some build failures.  No feature changes.

On 14.12.21 23:12, Peter Eisentraut wrote:
On 31.10.21 11:08, Peter Eisentraut wrote:
I want to reactivate $subject.  I took Petr Jelinek's patch from [0], rebased it, added a bit of testing.  It basically works, but as mentioned in [0], there are various issues to work out.

The idea is that the standby runs a background worker to periodically fetch replication slot information from the primary.  On failover, a logical subscriber would then ideally find up-to-date replication slots on the new publisher and can just continue normally.

So, again, this isn't anywhere near ready, but there is already a lot here to gather feedback about how it works, how it should work, how to configure it, and how it fits into an overall replication and HA architecture.

Here is an updated patch.  The main changes are that I added two configuration parameters.  The first, synchronize_slot_names, is set on the physical standby to specify which slots to sync from the primary. By default, it is empty.  (This also fixes the recovery test failures that I had to disable in the previous patch version.)  The second, standby_slot_names, is set on the primary.  It holds back logical replication until the listed physical standbys have caught up.  That way, when failover is necessary, the promoted standby is not behind the logical replication consumers.

In principle, this works now, I think.  I haven't made much progress in creating more test cases for this; that's something that needs more attention.

It's worth pondering what the configuration language for standby_slot_names should be.  Right now, it's just a list of slots that all need to be caught up.  More complicated setups are conceivable. Maybe you have standbys S1 and S2 that are potential failover targets for logical replication consumers L1 and L2, and also standbys S3 and S4 that are potential failover targets for logical replication consumers L3 and L4.  Viewed like that, this setting could be a replication slot setting.  The setting might also have some relationship with synchronous_standby_names.  Like, if you have synchronous_standby_names set, then that's a pretty good indication that you also want some or all of those standbys in standby_slot_names.  (But note that one is slots and one is application names.)  So there are a variety of possibilities.
From ec00dc6ab8bafefc00e9b1c78ac9348b643b8a87 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Mon, 3 Jan 2022 14:43:36 +0100
Subject: [PATCH v3] Synchronize logical replication slots from primary to
 standby

Discussion: 
https://www.postgresql.org/message-id/flat/514f6f2f-6833-4539-39f1-96cd1e011f23%40enterprisedb.com
---
 doc/src/sgml/config.sgml                      |  34 ++
 src/backend/commands/subscriptioncmds.c       |   4 +-
 src/backend/postmaster/bgworker.c             |   3 +
 .../libpqwalreceiver/libpqwalreceiver.c       |  94 ++++
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/launcher.c    | 202 ++++++---
 .../replication/logical/reorderbuffer.c       |  85 ++++
 src/backend/replication/logical/slotsync.c    | 412 ++++++++++++++++++
 src/backend/replication/logical/tablesync.c   |  13 +-
 src/backend/replication/repl_gram.y           |  32 +-
 src/backend/replication/repl_scanner.l        |   1 +
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           | 196 ++++++++-
 src/backend/utils/activity/wait_event.c       |   3 +
 src/backend/utils/misc/guc.c                  |  23 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/include/commands/subscriptioncmds.h       |   3 +
 src/include/nodes/nodes.h                     |   1 +
 src/include/nodes/replnodes.h                 |   9 +
 src/include/replication/logicalworker.h       |   9 +
 src/include/replication/slot.h                |   4 +-
 src/include/replication/walreceiver.h         |  16 +
 src/include/replication/worker_internal.h     |   8 +-
 src/include/utils/wait_event.h                |   1 +
 src/test/recovery/t/030_slot_sync.pl          |  58 +++
 25 files changed, 1146 insertions(+), 70 deletions(-)
 create mode 100644 src/backend/replication/logical/slotsync.c
 create mode 100644 src/test/recovery/t/030_slot_sync.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index afbb6c35e3..2b2a21a251 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4406,6 +4406,23 @@ <title>Primary Server</title>
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-standby-slot-names" xreflabel="standby_slot_names">
+      <term><varname>standby_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>standby_slot_names</varname> configuration 
parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        List of physical replication slots that logical replication waits for.
+        If a logical replication connection is meant to switch to a physical
+        standby after the standby is promoted, the physical replication slot
+        for the standby should be listed here.  This ensures that logical
+        replication is not ahead of the physical standby.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
@@ -4794,6 +4811,23 @@ <title>Standby Servers</title>
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-synchronize_slot_names" 
xreflabel="synchronize_slot_names">
+      <term><varname>synchronize_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>synchronize_slot_names</varname> configuration 
parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies a list of logical replication slots that a physical standby
+        should synchronize from the primary server.  This is necessary to be
+        able to retarget those logical replication connections to this standby
+        if it gets promoted.  Specify <literal>*</literal> to synchronize all
+        logical replication slots.  The default is empty.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 2b658080fe..7cdea20207 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -737,7 +737,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
                                RemoveSubscriptionRel(sub->oid, relid);
 
-                               logicalrep_worker_stop(sub->oid, relid);
+                               logicalrep_worker_stop(MyDatabaseId, sub->oid, 
relid);
 
                                /*
                                 * For READY state, we would have already 
dropped the
@@ -1239,7 +1239,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool 
isTopLevel)
        {
                LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-               logicalrep_worker_stop(w->subid, w->relid);
+               logicalrep_worker_stop(w->dbid, w->subid, w->relid);
        }
        list_free(subworkers);
 
diff --git a/src/backend/postmaster/bgworker.c 
b/src/backend/postmaster/bgworker.c
index c05f500639..818b8a35e9 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -128,6 +128,9 @@ static const struct
        },
        {
                "ApplyWorkerMain", ApplyWorkerMain
+       },
+       {
+               "ReplSlotSyncMain", ReplSlotSyncMain
        }
 };
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c 
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c08e599eef..c71fb0c1ec 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -33,6 +33,7 @@
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/tuplestore.h"
+#include "utils/varlena.h"
 
 PG_MODULE_MAGIC;
 
@@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
                                                                        char 
**sender_host, int *sender_port);
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
                                                                          
TimeLineID *primary_tli);
+static List *libpqrcv_list_slots(WalReceiverConn *conn, const char 
*slot_names);
 static int     libpqrcv_server_version(WalReceiverConn *conn);
 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
                                                                                
         TimeLineID tli, char **filename,
@@ -89,6 +91,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
        libpqrcv_get_conninfo,
        libpqrcv_get_senderinfo,
        libpqrcv_identify_system,
+       libpqrcv_list_slots,
        libpqrcv_server_version,
        libpqrcv_readtimelinehistoryfile,
        libpqrcv_startstreaming,
@@ -397,6 +400,97 @@ libpqrcv_server_version(WalReceiverConn *conn)
        return PQserverVersion(conn->streamConn);
 }
 
+/*
+ * Get list of slots from primary.
+ */
+static List *
+libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names)
+{
+       PGresult   *res;
+       List       *slotlist = NIL;
+       int                     ntuples;
+       StringInfoData s;
+       WalRecvReplicationSlotData *slot_data;
+
+       initStringInfo(&s);
+       appendStringInfoString(&s, "LIST_SLOTS");
+
+       if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0)
+       {
+               char       *rawname;
+               List       *namelist;
+               ListCell   *lc;
+
+               appendStringInfoChar(&s, ' ');
+               rawname = pstrdup(slot_names);
+               SplitIdentifierString(rawname, ',', &namelist);
+               foreach (lc, namelist)
+               {
+                       if (lc != list_head(namelist))
+                               appendStringInfoChar(&s, ',');
+                       appendStringInfo(&s, "%s",
+                                                        
quote_identifier(lfirst(lc)));
+               }
+       }
+
+       res = libpqrcv_PQexec(conn->streamConn, s.data);
+       pfree(s.data);
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("could not receive list of slots the 
primary server: %s",
+                                               
pchomp(PQerrorMessage(conn->streamConn)))));
+       }
+       if (PQnfields(res) < 10)
+       {
+               int                     nfields = PQnfields(res);
+
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("invalid response from primary server"),
+                                errdetail("Could not get list of slots: got %d 
fields, expected %d or more fields.",
+                                                  nfields, 10)));
+       }
+
+       ntuples = PQntuples(res);
+       for (int i = 0; i < ntuples; i++)
+       {
+               char       *slot_type;
+
+               slot_data = palloc0(sizeof(WalRecvReplicationSlotData));
+               namestrcpy(&slot_data->name, PQgetvalue(res, i, 0));
+               if (!PQgetisnull(res, i, 1))
+                       namestrcpy(&slot_data->plugin, PQgetvalue(res, i, 1));
+               slot_type = PQgetvalue(res, i, 2);
+               if (!PQgetisnull(res, i, 3))
+                       slot_data->database = atooid(PQgetvalue(res, i, 3));
+               if (strcmp(slot_type, "physical") == 0)
+               {
+                       if (OidIsValid(slot_data->database))
+                               elog(ERROR, "unexpected physical replication 
slot with database set");
+               }
+               if (pg_strtoint32(PQgetvalue(res, i, 5)) == 1)
+                       slot_data->persistency = RS_TEMPORARY;
+               else
+                       slot_data->persistency = RS_PERSISTENT;
+               if (!PQgetisnull(res, i, 6))
+                       slot_data->xmin = atooid(PQgetvalue(res, i, 6));
+               if (!PQgetisnull(res, i, 7))
+                       slot_data->catalog_xmin = atooid(PQgetvalue(res, i, 7));
+               if (!PQgetisnull(res, i, 8))
+                       slot_data->restart_lsn = strtou64(PQgetvalue(res, i, 
8), NULL, 10);
+               if (!PQgetisnull(res, i, 9))
+                       slot_data->confirmed_flush = strtou64(PQgetvalue(res, 
i, 9), NULL, 10);
+
+               slotlist = lappend(slotlist, slot_data);
+       }
+
+       PQclear(res);
+
+       return slotlist;
+}
+
 /*
  * Start streaming WAL data from given streaming options.
  *
diff --git a/src/backend/replication/logical/Makefile 
b/src/backend/replication/logical/Makefile
index c4e2fdeb71..bc3f23b5a2 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -24,6 +24,7 @@ OBJS = \
        proto.o \
        relation.o \
        reorderbuffer.o \
+       slotsync.o \
        snapbuild.o \
        tablesync.o \
        worker.o
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 3fb4caa803..207ef9bc8b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -22,6 +22,7 @@
 #include "access/htup_details.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
@@ -212,7 +213,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
  * subscription id and relid.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running)
 {
        int                     i;
        LogicalRepWorker *res = NULL;
@@ -224,8 +225,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool 
only_running)
        {
                LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-               if (w->in_use && w->subid == subid && w->relid == relid &&
-                       (!only_running || w->proc))
+               if (w->in_use && w->dbid == dbid && w->subid == subid &&
+                       w->relid == relid && (!only_running || w->proc))
                {
                        res = w;
                        break;
@@ -275,9 +276,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char 
*subname, Oid userid,
        int                     nsyncworkers;
        TimestampTz now;
 
-       ereport(DEBUG1,
-                       (errmsg_internal("starting logical replication worker 
for subscription \"%s\"",
-                                                        subname)));
+       if (OidIsValid(subid))
+               ereport(DEBUG1,
+                               (errmsg_internal("starting logical replication 
worker for subscription \"%s\"",
+                                                                subname)));
+       else
+               ereport(DEBUG1,
+                               (errmsg_internal("starting replication slot 
synchronization worker")));
 
        /* Report this after the initial starting message for consistency. */
        if (max_replication_slots == 0)
@@ -314,7 +319,9 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char 
*subname, Oid userid,
         * reason we do this is because if some worker failed to start up and 
its
         * parent has crashed while waiting, the in_use state was never cleared.
         */
-       if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+       if (worker == NULL ||
+               (OidIsValid(relid) &&
+                nsyncworkers >= max_sync_workers_per_subscription))
        {
                bool            did_cleanup = false;
 
@@ -348,7 +355,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char 
*subname, Oid userid,
         * silently as we might get here because of an otherwise harmless race
         * condition.
         */
-       if (nsyncworkers >= max_sync_workers_per_subscription)
+       if (OidIsValid(relid) && nsyncworkers >= 
max_sync_workers_per_subscription)
        {
                LWLockRelease(LogicalRepWorkerLock);
                return;
@@ -395,15 +402,22 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char 
*subname, Oid userid,
        memset(&bgw, 0, sizeof(bgw));
        bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                BGWORKER_BACKEND_DATABASE_CONNECTION;
-       bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+       bgw.bgw_start_time = BgWorkerStart_ConsistentState;
        snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
-       snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+       if (OidIsValid(subid))
+               snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+       else
+               snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain");
        if (OidIsValid(relid))
                snprintf(bgw.bgw_name, BGW_MAXLEN,
                                 "logical replication worker for subscription 
%u sync %u", subid, relid);
-       else
+       else if (OidIsValid(subid))
                snprintf(bgw.bgw_name, BGW_MAXLEN,
                                 "logical replication worker for subscription 
%u", subid);
+       else
+               snprintf(bgw.bgw_name, BGW_MAXLEN,
+                                "replication slot synchronization worker");
+
        snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
 
        bgw.bgw_restart_time = BGW_NEVER_RESTART;
@@ -434,14 +448,14 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char 
*subname, Oid userid,
  * it detaches from the slot.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid)
 {
        LogicalRepWorker *worker;
        uint16          generation;
 
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-       worker = logicalrep_worker_find(subid, relid, false);
+       worker = logicalrep_worker_find(dbid, subid, relid, false);
 
        /* No worker, nothing to do. */
        if (!worker)
@@ -531,13 +545,13 @@ logicalrep_worker_stop(Oid subid, Oid relid)
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
 void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid)
 {
        LogicalRepWorker *worker;
 
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-       worker = logicalrep_worker_find(subid, relid, true);
+       worker = logicalrep_worker_find(dbid, subid, relid, true);
 
        if (worker)
                logicalrep_worker_wakeup_ptr(worker);
@@ -714,7 +728,7 @@ ApplyLauncherRegister(void)
        memset(&bgw, 0, sizeof(bgw));
        bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                BGWORKER_BACKEND_DATABASE_CONNECTION;
-       bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+       bgw.bgw_start_time = BgWorkerStart_ConsistentState;
        snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
        snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
        snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -795,6 +809,116 @@ ApplyLauncherWakeup(void)
                kill(LogicalRepCtx->launcher_pid, SIGUSR1);
 }
 
+static void
+ApplyLauncherStartSlotSync(TimestampTz *last_start_time, long *wait_time)
+{
+       WalReceiverConn *wrconn;
+       TimestampTz now;
+       char       *err;
+       List       *slots;
+       ListCell   *lc;
+       MemoryContext tmpctx;
+       MemoryContext oldctx;
+
+       if (strcmp(synchronize_slot_names, "") == 0)
+               return;
+
+       wrconn = walrcv_connect(PrimaryConnInfo, false,
+                                                       "Logical Replication 
Launcher", &err);
+       if (!wrconn)
+               ereport(ERROR,
+                               (errmsg("could not connect to the primary 
server: %s", err)));
+
+       /* Use temporary context for the slot list and worker info. */
+       tmpctx = AllocSetContextCreate(TopMemoryContext,
+                                                                  "Logical 
Replication Launcher slot sync ctx",
+                                                                  
ALLOCSET_DEFAULT_SIZES);
+       oldctx = MemoryContextSwitchTo(tmpctx);
+
+       slots = walrcv_list_slots(wrconn, synchronize_slot_names);
+
+       now = GetCurrentTimestamp();
+
+       foreach(lc, slots)
+       {
+               WalRecvReplicationSlotData *slot_data = lfirst(lc);
+               LogicalRepWorker *w;
+
+               if (!OidIsValid(slot_data->database))
+                       continue;
+
+               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+               w = logicalrep_worker_find(slot_data->database, InvalidOid,
+                                                                  InvalidOid, 
false);
+               LWLockRelease(LogicalRepWorkerLock);
+
+               if (w == NULL)
+               {
+                       *last_start_time = now;
+                       *wait_time = wal_retrieve_retry_interval;
+
+                       logicalrep_worker_launch(slot_data->database, 
InvalidOid, NULL,
+                                                                        
BOOTSTRAP_SUPERUSERID, InvalidOid);
+               }
+       }
+
+       /* Switch back to original memory context. */
+       MemoryContextSwitchTo(oldctx);
+       /* Clean the temporary memory. */
+       MemoryContextDelete(tmpctx);
+
+       walrcv_disconnect(wrconn);
+}
+
+static void
+ApplyLauncherStartSubs(TimestampTz *last_start_time, long *wait_time)
+{
+       TimestampTz now;
+       List       *sublist;
+       ListCell   *lc;
+       MemoryContext subctx;
+       MemoryContext oldctx;
+
+       now = GetCurrentTimestamp();
+
+       /* Use temporary context for the database list and worker info. */
+       subctx = AllocSetContextCreate(TopMemoryContext,
+                                                                  "Logical 
Replication Launcher sublist",
+                                                                  
ALLOCSET_DEFAULT_SIZES);
+       oldctx = MemoryContextSwitchTo(subctx);
+
+       /* search for subscriptions to start or stop. */
+       sublist = get_subscription_list();
+
+       /* Start the missing workers for enabled subscriptions. */
+       foreach(lc, sublist)
+       {
+               Subscription *sub = (Subscription *) lfirst(lc);
+               LogicalRepWorker *w;
+
+               if (!sub->enabled)
+                       continue;
+
+               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+               w = logicalrep_worker_find(sub->dbid, sub->oid, InvalidOid, 
false);
+               LWLockRelease(LogicalRepWorkerLock);
+
+               if (w == NULL)
+               {
+                       *last_start_time = now;
+                       *wait_time = wal_retrieve_retry_interval;
+
+                       logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+                                                                        
sub->owner, InvalidOid);
+               }
+       }
+
+       /* Switch back to original memory context. */
+       MemoryContextSwitchTo(oldctx);
+       /* Clean the temporary memory. */
+       MemoryContextDelete(subctx);
+}
+
 /*
  * Main loop for the apply launcher process.
  */
@@ -822,14 +946,12 @@ ApplyLauncherMain(Datum main_arg)
         */
        BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+       load_file("libpqwalreceiver", false);
+
        /* Enter main loop */
        for (;;)
        {
                int                     rc;
-               List       *sublist;
-               ListCell   *lc;
-               MemoryContext subctx;
-               MemoryContext oldctx;
                TimestampTz now;
                long            wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
@@ -841,42 +963,10 @@ ApplyLauncherMain(Datum main_arg)
                if (TimestampDifferenceExceeds(last_start_time, now,
                                                                           
wal_retrieve_retry_interval))
                {
-                       /* Use temporary context for the database list and 
worker info. */
-                       subctx = AllocSetContextCreate(TopMemoryContext,
-                                                                               
   "Logical Replication Launcher sublist",
-                                                                               
   ALLOCSET_DEFAULT_SIZES);
-                       oldctx = MemoryContextSwitchTo(subctx);
-
-                       /* search for subscriptions to start or stop. */
-                       sublist = get_subscription_list();
-
-                       /* Start the missing workers for enabled subscriptions. 
*/
-                       foreach(lc, sublist)
-                       {
-                               Subscription *sub = (Subscription *) lfirst(lc);
-                               LogicalRepWorker *w;
-
-                               if (!sub->enabled)
-                                       continue;
-
-                               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-                               w = logicalrep_worker_find(sub->oid, 
InvalidOid, false);
-                               LWLockRelease(LogicalRepWorkerLock);
-
-                               if (w == NULL)
-                               {
-                                       last_start_time = now;
-                                       wait_time = wal_retrieve_retry_interval;
-
-                                       logicalrep_worker_launch(sub->dbid, 
sub->oid, sub->name,
-                                                                               
         sub->owner, InvalidOid);
-                               }
-                       }
-
-                       /* Switch back to original memory context. */
-                       MemoryContextSwitchTo(oldctx);
-                       /* Clean the temporary memory. */
-                       MemoryContextDelete(subctx);
+                       if (!RecoveryInProgress())
+                               ApplyLauncherStartSubs(&last_start_time, 
&wait_time);
+                       else
+                               ApplyLauncherStartSlotSync(&last_start_time, 
&wait_time);
                }
                else
                {
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index 7aa5647a2c..f0b3b9ad87 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -95,11 +95,13 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "replication/logical.h"
+#include "replication/logicalworker.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"     /* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/ipc.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
@@ -107,6 +109,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenodemap.h"
+#include "utils/varlena.h"
 
 
 /* entry for a hash table we use to map from xid to our transaction state */
@@ -2006,6 +2009,85 @@ ReorderBufferResetTXN(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
        }
 }
 
+static void
+wait_for_standby_confirmation(XLogRecPtr commit_lsn)
+{
+       char       *rawname;
+       List       *namelist;
+       ListCell   *lc;
+       XLogRecPtr      flush_pos = InvalidXLogRecPtr;
+
+       if (strcmp(standby_slot_names, "") == 0)
+               return;
+
+       rawname = pstrdup(standby_slot_names);
+       SplitIdentifierString(rawname, ',', &namelist);
+
+       while (true)
+       {
+               int                     wait_slots_remaining;
+               XLogRecPtr      oldest_flush_pos = InvalidXLogRecPtr;
+               int                     rc;
+
+               wait_slots_remaining = list_length(namelist);
+
+               LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+               for (int i = 0; i < max_replication_slots; i++)
+               {
+                       ReplicationSlot *s = 
&ReplicationSlotCtl->replication_slots[i];
+                       bool            inlist;
+
+                       if (!s->in_use)
+                               continue;
+
+                       inlist = false;
+                       foreach (lc, namelist)
+                       {
+                               char *name = lfirst(lc);
+                               if (strcmp(name, NameStr(s->data.name)) == 0)
+                               {
+                                       inlist = true;
+                                       break;
+                               }
+                       }
+                       if (!inlist)
+                               continue;
+
+                       SpinLockAcquire(&s->mutex);
+
+                       if (s->data.database == InvalidOid)
+                               /* Physical slots advance restart_lsn on flush 
and ignore confirmed_flush_lsn */
+                               flush_pos = s->data.restart_lsn;
+                       else
+                               /* For logical slots we must wait for commit 
and flush */
+                               flush_pos = s->data.confirmed_flush;
+
+                       SpinLockRelease(&s->mutex);
+
+                       /* We want to find out the min(flush pos) over all 
named slots */
+                       if (oldest_flush_pos == InvalidXLogRecPtr
+                               || oldest_flush_pos > flush_pos)
+                               oldest_flush_pos = flush_pos;
+
+                       if (flush_pos >= commit_lsn && wait_slots_remaining > 0)
+                               wait_slots_remaining --;
+               }
+               LWLockRelease(ReplicationSlotControlLock);
+
+               if (wait_slots_remaining == 0)
+                       return;
+
+               rc = WaitLatch(MyLatch,
+                                          WL_LATCH_SET | WL_TIMEOUT | 
WL_POSTMASTER_DEATH,
+                                          1000L, PG_WAIT_EXTENSION);
+
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+
+               CHECK_FOR_INTERRUPTS();
+       }
+}
+
 /*
  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
  *
@@ -2434,6 +2516,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                         * Call either PREPARE (for two-phase transactions) or 
COMMIT (for
                         * regular ones).
                         */
+
+                       wait_for_standby_confirmation(commit_lsn);
+
                        if (rbtxn_prepared(txn))
                                rb->prepare(rb, txn, commit_lsn);
                        else
diff --git a/src/backend/replication/logical/slotsync.c 
b/src/backend/replication/logical/slotsync.c
new file mode 100644
index 0000000000..654ac154ea
--- /dev/null
+++ b/src/backend/replication/logical/slotsync.c
@@ -0,0 +1,412 @@
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ *        PostgreSQL worker for synchronizing slots to a standby from primary
+ *
+ * Copyright (c) 2016-2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/logical/slotsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "commands/dbcommands.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "replication/logicalworker.h"
+#include "replication/walreceiver.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/varlena.h"
+
+char      *synchronize_slot_names;
+char      *standby_slot_names;
+
+/*
+ * Wait for remote slot to pass localy reserved position.
+ */
+static void
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name,
+                                                         XLogRecPtr min_lsn)
+{
+       WalRcvExecResult *res;
+       TupleTableSlot *slot;
+       Oid                     slotRow[1] = {LSNOID};
+       StringInfoData cmd;
+       bool            isnull;
+       XLogRecPtr      restart_lsn;
+
+       for (;;)
+       {
+               int                     rc;
+
+               CHECK_FOR_INTERRUPTS();
+
+               initStringInfo(&cmd);
+               appendStringInfo(&cmd,
+                                                "SELECT restart_lsn"
+                                                "  FROM 
pg_catalog.pg_replication_slots"
+                                                " WHERE slot_name = %s",
+                                                quote_literal_cstr(slot_name));
+               res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
+
+               if (res->status != WALRCV_OK_TUPLES)
+                       ereport(ERROR,
+                                       (errmsg("could not fetch slot info for 
slot \"%s\" from primary: %s",
+                                                       slot_name, res->err)));
+
+               slot = MakeSingleTupleTableSlot(res->tupledesc, 
&TTSOpsMinimalTuple);
+               if (!tuplestore_gettupleslot(res->tuplestore, true, false, 
slot))
+                       ereport(ERROR,
+                                       (errmsg("slot \"%s\" disapeared from 
provider",
+                                                       slot_name)));
+
+               restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+               Assert(!isnull);
+
+               ExecClearTuple(slot);
+               walrcv_clear_result(res);
+
+               if (restart_lsn >= min_lsn)
+                       break;
+
+               rc = WaitLatch(MyLatch,
+                                          WL_LATCH_SET | WL_TIMEOUT | 
WL_POSTMASTER_DEATH,
+                                          wal_retrieve_retry_interval,
+                                          WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+               ResetLatch(MyLatch);
+
+               /* emergency bailout if postmaster has died */
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+       }
+}
+
+/*
+ * Synchronize single slot to given position.
+ *
+ * This optionally creates new slot if there is no existing one.
+ */
+static void
+synchronize_one_slot(WalReceiverConn *wrconn, char *slot_name, char *database,
+                                        char *plugin_name, XLogRecPtr 
target_lsn)
+{
+       bool            found = false;
+       XLogRecPtr      endlsn;
+
+       /* Search for the named slot and mark it active if we find it. */
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (int i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (!s->in_use)
+                       continue;
+
+               if (strcmp(NameStr(s->data.name), slot_name) == 0)
+               {
+                       found = true;
+                       break;
+               }
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       StartTransactionCommand();
+
+       /* Already existing slot, acquire */
+       if (found)
+       {
+               ReplicationSlotAcquire(slot_name, true);
+
+               if (target_lsn < MyReplicationSlot->data.confirmed_flush)
+               {
+                       elog(DEBUG1,
+                                "not synchronizing slot %s; synchronization 
would move it backward",
+                                slot_name);
+
+                       ReplicationSlotRelease();
+                       CommitTransactionCommand();
+                       return;
+               }
+       }
+       /* Otherwise create the slot first. */
+       else
+       {
+               TransactionId xmin_horizon = InvalidTransactionId;
+               ReplicationSlot *slot;
+
+               ReplicationSlotCreate(slot_name, true, RS_EPHEMERAL, false);
+               slot = MyReplicationSlot;
+
+               SpinLockAcquire(&slot->mutex);
+               slot->data.database = get_database_oid(database, false);
+               namestrcpy(&slot->data.plugin, plugin_name);
+               SpinLockRelease(&slot->mutex);
+
+               ReplicationSlotReserveWal();
+
+               LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+               xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+               slot->effective_catalog_xmin = xmin_horizon;
+               slot->data.catalog_xmin = xmin_horizon;
+               ReplicationSlotsComputeRequiredXmin(true);
+               LWLockRelease(ProcArrayLock);
+
+               if (target_lsn < MyReplicationSlot->data.restart_lsn)
+               {
+                       ereport(LOG,
+                                       errmsg("waiting for remote slot \"%s\" 
LSN (%X/%X) to pass local slot LSN (%X/%X)",
+                                                  slot_name,
+                                                  LSN_FORMAT_ARGS(target_lsn), 
LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn)));
+
+                       wait_for_primary_slot_catchup(wrconn, slot_name,
+                                                                               
  MyReplicationSlot->data.restart_lsn);
+               }
+
+               ReplicationSlotPersist();
+       }
+
+       endlsn = pg_logical_replication_slot_advance(target_lsn);
+
+       elog(DEBUG3, "synchronized slot %s to lsn (%X/%X)",
+                slot_name, LSN_FORMAT_ARGS(endlsn));
+
+       ReplicationSlotRelease();
+       CommitTransactionCommand();
+}
+
+static void
+synchronize_slots(void)
+{
+       WalRcvExecResult *res;
+       WalReceiverConn *wrconn = NULL;
+       TupleTableSlot *slot;
+       Oid                     slotRow[3] = {TEXTOID, TEXTOID, LSNOID};
+       StringInfoData s;
+       char       *database;
+       char       *err;
+       MemoryContext oldctx = CurrentMemoryContext;
+
+       if (!WalRcv)
+               return;
+
+       /* syscache access needs a transaction env. */
+       StartTransactionCommand();
+       /* make dbname live outside TX context */
+       MemoryContextSwitchTo(oldctx);
+
+       database = get_database_name(MyDatabaseId);
+       initStringInfo(&s);
+       appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database);
+       wrconn = walrcv_connect(s.data, true, "slot_sync", &err);
+
+       if (wrconn == NULL)
+               ereport(ERROR,
+                               (errmsg("could not connect to the primary 
server: %s", err)));
+
+       resetStringInfo(&s);
+       appendStringInfo(&s,
+                                        "SELECT slot_name, plugin, 
confirmed_flush_lsn"
+                                        "  FROM 
pg_catalog.pg_replication_slots"
+                                        " WHERE database = %s",
+                                        quote_literal_cstr(database));
+       if (strcmp(synchronize_slot_names, "") != 0 && 
strcmp(synchronize_slot_names, "*") != 0)
+       {
+               char       *rawname;
+               List       *namelist;
+               ListCell   *lc;
+
+               rawname = pstrdup(synchronize_slot_names);
+               SplitIdentifierString(rawname, ',', &namelist);
+
+               appendStringInfoString(&s, " AND slot_name IN (");
+               foreach (lc, namelist)
+               {
+                       if (lc != list_head(namelist))
+                               appendStringInfoChar(&s, ',');
+                       appendStringInfo(&s, "%s",
+                                                        
quote_literal_cstr(lfirst(lc)));
+               }
+               appendStringInfoChar(&s, ')');
+       }
+
+       res = walrcv_exec(wrconn, s.data, 3, slotRow);
+       pfree(s.data);
+
+       if (res->status != WALRCV_OK_TUPLES)
+               ereport(ERROR,
+                               (errmsg("could not fetch slot info from 
primary: %s",
+                                               res->err)));
+
+       CommitTransactionCommand();
+       /* CommitTransactionCommand switches to TopMemoryContext */
+       MemoryContextSwitchTo(oldctx);
+
+       slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+       while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+       {
+               char       *slot_name;
+               char       *plugin_name;
+               XLogRecPtr      confirmed_flush_lsn;
+               bool            isnull;
+
+               slot_name = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+               Assert(!isnull);
+
+               plugin_name = TextDatumGetCString(slot_getattr(slot, 2, 
&isnull));
+               Assert(!isnull);
+
+               confirmed_flush_lsn = DatumGetLSN(slot_getattr(slot, 3, 
&isnull));
+               Assert(!isnull);
+
+               synchronize_one_slot(wrconn, slot_name, database, plugin_name,
+                                                        confirmed_flush_lsn);
+
+               ExecClearTuple(slot);
+       }
+
+       walrcv_clear_result(res);
+       pfree(database);
+
+       walrcv_disconnect(wrconn);
+}
+
+/*
+ * The main loop of our worker process.
+ */
+void
+ReplSlotSyncMain(Datum main_arg)
+{
+       int                     worker_slot = DatumGetInt32(main_arg);
+
+       /* Attach to slot */
+       logicalrep_worker_attach(worker_slot);
+
+       /* Establish signal handlers. */
+       BackgroundWorkerUnblockSignals();
+
+       /* Load the libpq-specific functions */
+       load_file("libpqwalreceiver", false);
+
+       /* Connect to our database. */
+       BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
+                                                                               
          MyLogicalRepWorker->userid,
+                                                                               
          0);
+
+       StartTransactionCommand();
+       ereport(LOG,
+                       (errmsg("replication slot synchronization worker for 
database \"%s\" has started",
+                                       
get_database_name(MyLogicalRepWorker->dbid))));
+       CommitTransactionCommand();
+
+       /* Main wait loop. */
+       for (;;)
+       {
+               int                     rc;
+
+               CHECK_FOR_INTERRUPTS();
+
+               if (!RecoveryInProgress())
+                       return;
+
+               if (strcmp(synchronize_slot_names, "") == 0)
+                       return;
+
+               synchronize_slots();
+
+               rc = WaitLatch(MyLatch,
+                                          WL_LATCH_SET | WL_TIMEOUT | 
WL_POSTMASTER_DEATH,
+                                          wal_retrieve_retry_interval,
+                                          WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+               ResetLatch(MyLatch);
+
+               /* emergency bailout if postmaster has died */
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+       }
+}
+
+/*
+ * Routines for handling the GUC variable(s)
+ */
+
+bool
+check_synchronize_slot_names(char **newval, void **extra, GucSource source)
+{
+       /* Special handling for "*" which means all. */
+       if (strcmp(*newval, "*") == 0)
+       {
+               return true;
+       }
+       else
+       {
+               char       *rawname;
+               List       *namelist;
+               ListCell   *lc;
+
+               /* Need a modifiable copy of string */
+               rawname = pstrdup(*newval);
+
+               /* Parse string into list of identifiers */
+               if (!SplitIdentifierString(rawname, ',', &namelist))
+               {
+                       /* syntax error in name list */
+                       GUC_check_errdetail("List syntax is invalid.");
+                       pfree(rawname);
+                       list_free(namelist);
+                       return false;
+               }
+
+               foreach(lc, namelist)
+               {
+                       char       *curname = (char *) lfirst(lc);
+
+                       ReplicationSlotValidateName(curname, ERROR);
+               }
+
+               pfree(rawname);
+               list_free(namelist);
+       }
+
+       return true;
+}
+
+
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+       char       *rawname;
+       List       *namelist;
+       ListCell   *lc;
+
+       /* Need a modifiable copy of string */
+       rawname = pstrdup(*newval);
+
+       /* Parse string into list of identifiers */
+       if (!SplitIdentifierString(rawname, ',', &namelist))
+       {
+               /* syntax error in name list */
+               GUC_check_errdetail("List syntax is invalid.");
+               pfree(rawname);
+               list_free(namelist);
+               return false;
+       }
+
+       foreach(lc, namelist)
+       {
+               char       *curname = (char *) lfirst(lc);
+
+               ReplicationSlotValidateName(curname, ERROR);
+       }
+
+       pfree(rawname);
+       list_free(namelist);
+
+       return true;
+}
diff --git a/src/backend/replication/logical/tablesync.c 
b/src/backend/replication/logical/tablesync.c
index f07983a43c..0e0593f716 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "commands/subscriptioncmds.h"
 #include "miscadmin.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
@@ -151,7 +152,8 @@ finish_sync_worker(void)
        CommitTransactionCommand();
 
        /* Find the main apply worker and signal it. */
-       logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+       logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+                                                        
MyLogicalRepWorker->subid, InvalidOid);
 
        /* Stop gracefully */
        proc_exit(0);
@@ -191,7 +193,8 @@ wait_for_relation_state_change(Oid relid, char 
expected_state)
 
                /* Check if the sync worker is still running and bail if not. */
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-               worker = logicalrep_worker_find(MyLogicalRepWorker->subid, 
relid,
+               worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+                                                                               
MyLogicalRepWorker->subid, relid,
                                                                                
false);
                LWLockRelease(LogicalRepWorkerLock);
                if (!worker)
@@ -238,7 +241,8 @@ wait_for_worker_state_change(char expected_state)
                 * waiting.
                 */
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-               worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+               worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+                                                                               
MyLogicalRepWorker->subid,
                                                                                
InvalidOid, false);
                if (worker && worker->proc)
                        logicalrep_worker_wakeup_ptr(worker);
@@ -484,7 +488,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                         */
                        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-                       syncworker = 
logicalrep_worker_find(MyLogicalRepWorker->subid,
+                       syncworker = 
logicalrep_worker_find(MyLogicalRepWorker->dbid,
+                                                                               
                MyLogicalRepWorker->subid,
                                                                                
                rstate->relid, false);
 
                        if (syncworker)
diff --git a/src/backend/replication/repl_gram.y 
b/src/backend/replication/repl_gram.y
index dcb1108579..902841efe6 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -91,11 +91,12 @@ static SQLCmd *make_sqlcmd(void);
 %token K_USE_SNAPSHOT
 %token K_MANIFEST
 %token K_MANIFEST_CHECKSUMS
+%token K_LIST_SLOTS
 
 %type <node>   command
 %type <node>   base_backup start_replication start_logical_replication
                                create_replication_slot drop_replication_slot 
identify_system
-                               read_replication_slot timeline_history show 
sql_cmd
+                               read_replication_slot timeline_history show 
sql_cmd list_slots
 %type <list>   base_backup_legacy_opt_list generic_option_list
 %type <defelt> base_backup_legacy_opt generic_option
 %type <uintval>        opt_timeline
@@ -106,6 +107,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <boolval>        opt_temporary
 %type <list>   create_slot_options create_slot_legacy_opt_list
 %type <defelt> create_slot_legacy_opt
+%type <list>   slot_name_list slot_name_list_opt
 
 %%
 
@@ -129,6 +131,7 @@ command:
                        | read_replication_slot
                        | timeline_history
                        | show
+                       | list_slots
                        | sql_cmd
                        ;
 
@@ -142,6 +145,33 @@ identify_system:
                                }
                        ;
 
+slot_name_list:
+                       IDENT
+                               {
+                                       $$ = list_make1($1);
+                               }
+                       | slot_name_list ',' IDENT
+                               {
+                                       $$ = lappend($1, $3);
+                               }
+
+slot_name_list_opt:
+                       slot_name_list                  { $$ = $1; }
+                       | /* EMPTY */                   { $$ = NIL; }
+               ;
+
+/*
+ * LIST_SLOTS
+ */
+list_slots:
+                       K_LIST_SLOTS slot_name_list_opt
+                               {
+                                       ListSlotsCmd *cmd = 
makeNode(ListSlotsCmd);
+                                       cmd->slot_names = $2;
+                                       $$ = (Node *) cmd;
+                               }
+                       ;
+
 /*
  * READ_REPLICATION_SLOT %s
  */
diff --git a/src/backend/replication/repl_scanner.l 
b/src/backend/replication/repl_scanner.l
index 1b599c255e..9ee638355d 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier            {ident_start}{ident_cont}*
 BASE_BACKUP                    { return K_BASE_BACKUP; }
 FAST                   { return K_FAST; }
 IDENTIFY_SYSTEM                { return K_IDENTIFY_SYSTEM; }
+LIST_SLOTS             { return K_LIST_SLOTS; }
 READ_REPLICATION_SLOT  { return K_READ_REPLICATION_SLOT; }
 SHOW           { return K_SHOW; }
 LABEL                  { return K_LABEL; }
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index d11daeb1fc..e93fea55ad 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -484,7 +484,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
  * mode, no changes are generated anyway.
  */
-static XLogRecPtr
+XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
        LogicalDecodingContext *ctx;
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 84915ed95b..2fce290ed6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -456,6 +456,194 @@ IdentifySystem(void)
        end_tup_output(tstate);
 }
 
+static int
+pg_qsort_namecmp(const void *a, const void *b)
+{
+       return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN);
+}
+
+/*
+ * Handle the LIST_SLOTS command.
+ */
+static void
+ListSlots(ListSlotsCmd *cmd)
+{
+       DestReceiver *dest;
+       TupOutputState *tstate;
+       TupleDesc       tupdesc;
+       NameData   *slot_names;
+       int                     numslot_names;
+
+       numslot_names = list_length(cmd->slot_names);
+       if (numslot_names)
+       {
+               ListCell   *lc;
+               int                     i = 0;
+
+               slot_names = palloc(numslot_names * sizeof(NameData));
+               foreach(lc, cmd->slot_names)
+               {
+                       char       *slot_name = lfirst(lc);
+
+                       ReplicationSlotValidateName(slot_name, ERROR);
+                       namestrcpy(&slot_names[i++], slot_name);
+               }
+
+               qsort(slot_names, numslot_names, sizeof(NameData), 
pg_qsort_namecmp);
+       }
+
+       dest = CreateDestReceiver(DestRemoteSimple);
+
+       /* need a tuple descriptor representing four columns */
+       tupdesc = CreateTemplateTupleDesc(10);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "plugin",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "slot_type",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "datoid",
+                                                         INT8OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "database",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 6, "temporary",
+                                                         INT4OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 7, "xmin",
+                                                         INT8OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 8, "catalog_xmin",
+                                                         INT8OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 9, "restart_lsn",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 10, "confirmed_flush",
+                                                         TEXTOID, -1, 0);
+
+       /* prepare for projection of tuples */
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (int slotno = 0; slotno < max_replication_slots; slotno++)
+       {
+               ReplicationSlot *slot = 
&ReplicationSlotCtl->replication_slots[slotno];
+               char            restart_lsn_str[MAXFNAMELEN];
+               char            confirmed_flush_lsn_str[MAXFNAMELEN];
+               Datum           values[10];
+               bool            nulls[10];
+
+               ReplicationSlotPersistency persistency;
+               TransactionId xmin;
+               TransactionId catalog_xmin;
+               XLogRecPtr      restart_lsn;
+               XLogRecPtr      confirmed_flush_lsn;
+               Oid                     datoid;
+               NameData        slot_name;
+               NameData        plugin;
+               int                     i;
+               int64           tmpbigint;
+
+               if (!slot->in_use)
+                       continue;
+
+               SpinLockAcquire(&slot->mutex);
+
+               xmin = slot->data.xmin;
+               catalog_xmin = slot->data.catalog_xmin;
+               datoid = slot->data.database;
+               restart_lsn = slot->data.restart_lsn;
+               confirmed_flush_lsn = slot->data.confirmed_flush;
+               namestrcpy(&slot_name, NameStr(slot->data.name));
+               namestrcpy(&plugin, NameStr(slot->data.plugin));
+               persistency = slot->data.persistency;
+
+               SpinLockRelease(&slot->mutex);
+
+               if (numslot_names &&
+                       !bsearch((void *) &slot_name, (void *) slot_names,
+                                        numslot_names, sizeof(NameData), 
pg_qsort_namecmp))
+                       continue;
+
+               memset(nulls, 0, sizeof(nulls));
+
+               i = 0;
+               values[i++] = CStringGetTextDatum(NameStr(slot_name));
+
+               if (datoid == InvalidOid)
+                       nulls[i++] = true;
+               else
+                       values[i++] = CStringGetTextDatum(NameStr(plugin));
+
+               if (datoid == InvalidOid)
+                       values[i++] = CStringGetTextDatum("physical");
+               else
+                       values[i++] = CStringGetTextDatum("logical");
+
+               if (datoid == InvalidOid)
+                       nulls[i++] = true;
+               else
+               {
+                       tmpbigint = datoid;
+                       values[i++] = Int64GetDatum(tmpbigint);
+               }
+
+               if (datoid == InvalidOid)
+                       nulls[i++] = true;
+               else
+               {
+                       MemoryContext cur = CurrentMemoryContext;
+
+                       /* syscache access needs a transaction env. */
+                       StartTransactionCommand();
+                       /* make dbname live outside TX context */
+                       MemoryContextSwitchTo(cur);
+                       values[i++] = 
CStringGetTextDatum(get_database_name(datoid));
+                       CommitTransactionCommand();
+                       /* CommitTransactionCommand switches to 
TopMemoryContext */
+                       MemoryContextSwitchTo(cur);
+               }
+
+               values[i++] = Int32GetDatum(persistency == RS_TEMPORARY ? 1 : 
0);
+
+               if (xmin != InvalidTransactionId)
+               {
+                       tmpbigint = xmin;
+                       values[i++] = Int64GetDatum(tmpbigint);
+               }
+               else
+                       nulls[i++] = true;
+
+               if (catalog_xmin != InvalidTransactionId)
+               {
+                       tmpbigint = catalog_xmin;
+                       values[i++] = Int64GetDatum(tmpbigint);
+               }
+               else
+                       nulls[i++] = true;
+
+               if (restart_lsn != InvalidXLogRecPtr)
+               {
+                       snprintf(restart_lsn_str, sizeof(restart_lsn_str), 
"%X/%X",
+                                        LSN_FORMAT_ARGS(restart_lsn));
+                       values[i++] = CStringGetTextDatum(restart_lsn_str);
+               }
+               else
+                       nulls[i++] = true;
+
+               if (confirmed_flush_lsn != InvalidXLogRecPtr)
+               {
+                       snprintf(confirmed_flush_lsn_str, 
sizeof(confirmed_flush_lsn_str),
+                                        "%X/%X", 
LSN_FORMAT_ARGS(confirmed_flush_lsn));
+                       values[i++] = 
CStringGetTextDatum(confirmed_flush_lsn_str);
+               }
+               else
+                       nulls[i++] = true;
+
+               /* send it to dest */
+               do_tup_output(tstate, values, nulls);
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       end_tup_output(tstate);
+}
+
 /* Handle READ_REPLICATION_SLOT command */
 static void
 ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
@@ -554,7 +742,6 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
        end_tup_output(tstate);
 }
 
-
 /*
  * Handle TIMELINE_HISTORY command.
  */
@@ -1749,6 +1936,13 @@ exec_replication_command(const char *cmd_string)
                        EndReplicationCommand(cmdtag);
                        break;
 
+               case T_ListSlotsCmd:
+                       cmdtag = "LIST_SLOTS";
+                       set_ps_display(cmdtag);
+                       ListSlots((ListSlotsCmd *) cmd_node);
+                       EndReplicationCommand(cmdtag);
+                       break;
+
                case T_StartReplicationCmd:
                        {
                                StartReplicationCmd *cmd = (StartReplicationCmd 
*) cmd_node;
diff --git a/src/backend/utils/activity/wait_event.c 
b/src/backend/utils/activity/wait_event.c
index 4d53f040e8..6922353c94 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -230,6 +230,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
                case WAIT_EVENT_LOGICAL_LAUNCHER_MAIN:
                        event_name = "LogicalLauncherMain";
                        break;
+               case WAIT_EVENT_REPL_SLOT_SYNC_MAIN:
+                       event_name = "ReplSlotSyncMain";
+                       break;
                case WAIT_EVENT_PGSTAT_MAIN:
                        event_name = "PgStatMain";
                        break;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index f9504d3aec..0234f5ab87 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -75,6 +75,7 @@
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
@@ -4636,6 +4637,28 @@ static struct config_string ConfigureNamesString[] =
                check_backtrace_functions, assign_backtrace_functions, NULL
        },
 
+       {
+               {"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY,
+                       gettext_noop("Sets the names of replication slots which 
to synchronize from primary to standby."),
+                       gettext_noop("Value of \"*\" means all."),
+                       GUC_LIST_INPUT | GUC_LIST_QUOTE
+               },
+               &synchronize_slot_names,
+               "",
+               check_synchronize_slot_names, NULL, NULL
+       },
+
+       {
+               {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+                       gettext_noop("List of physical slots that must confirm 
changes before changes are sent to logical replication consumers."),
+                       NULL,
+                       GUC_LIST_INPUT | GUC_LIST_QUOTE
+               },
+               &standby_slot_names,
+               "",
+               check_standby_slot_names, NULL, NULL
+       },
+
        /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample 
b/src/backend/utils/misc/postgresql.conf.sample
index a1acd46b61..e8b5f76125 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -315,6 +315,7 @@
                                # and comma-separated list of application_name
                                # from standby(s); '*' = all
 #vacuum_defer_cleanup_age = 0  # number of xacts by which cleanup is delayed
+#standby_slot_names = ''       # physical standby slot names that logical 
replication waits for
 
 # - Standby Servers -
 
@@ -343,6 +344,7 @@
 #wal_retrieve_retry_interval = 5s      # time to wait before retrying to
                                        # retrieve WAL after a failed attempt
 #recovery_min_apply_delay = 0          # minimum delay for applying changes 
during recovery
+#synchronize_slot_names = ''           # logical replication slots to sync to 
standby
 
 # - Subscribers -
 
diff --git a/src/include/commands/subscriptioncmds.h 
b/src/include/commands/subscriptioncmds.h
index aec7e478ab..1cc19e0c99 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -17,6 +17,7 @@
 
 #include "catalog/objectaddress.h"
 #include "parser/parse_node.h"
+#include "replication/walreceiver.h"
 
 extern ObjectAddress CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
                                                                                
bool isTopLevel);
@@ -26,4 +27,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool 
isTopLevel);
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
 extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
 
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char 
*slotname, bool missing_ok);
+
 #endif                                                 /* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 7c657c1241..920a510c4c 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -501,6 +501,7 @@ typedef enum NodeTag
        T_StartReplicationCmd,
        T_TimeLineHistoryCmd,
        T_SQLCmd,
+       T_ListSlotsCmd,
 
        /*
         * TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index a746fafc12..3f81409e50 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd
        NodeTag         type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *             LIST_SLOTS command
+ * ----------------------
+ */
+typedef struct ListSlotsCmd
+{
+       NodeTag         type;
+       List       *slot_names;
+} ListSlotsCmd;
 
 /* ----------------------
  *             BASE_BACKUP command
diff --git a/src/include/replication/logicalworker.h 
b/src/include/replication/logicalworker.h
index 2ad61a001a..f5b5ef07e8 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -12,8 +12,17 @@
 #ifndef LOGICALWORKER_H
 #define LOGICALWORKER_H
 
+#include "utils/guc.h"
+
+extern char *synchronize_slot_names;
+extern char *standby_slot_names;
+
 extern void ApplyWorkerMain(Datum main_arg);
+extern void ReplSlotSyncMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern bool check_synchronize_slot_names(char **newval, void **extra, 
GucSource source);
+extern bool check_standby_slot_names(char **newval, void **extra, GucSource 
source);
+
 #endif                                                 /* LOGICALWORKER_H */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 53d773ccff..a29e517707 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -216,7 +216,6 @@ extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool 
need_lock);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char 
*syncslotname, int szslot);
-extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char 
*slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
@@ -224,4 +223,7 @@ extern void CheckPointReplicationSlots(void);
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
 
+extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto);
+
+
 #endif                                                 /* SLOT_H */
diff --git a/src/include/replication/walreceiver.h 
b/src/include/replication/walreceiver.h
index 0b607ed777..dbeb447e7a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -18,6 +18,7 @@
 #include "pgtime.h"
 #include "port/atomics.h"
 #include "replication/logicalproto.h"
+#include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/condition_variable.h"
 #include "storage/latch.h"
@@ -187,6 +188,13 @@ typedef struct
        }                       proto;
 } WalRcvStreamOptions;
 
+/*
+ * Slot information receiver from remote.
+ *
+ * Currently this is same as ReplicationSlotPersistentData
+ */
+#define WalRecvReplicationSlotData ReplicationSlotPersistentData
+
 struct WalReceiverConn;
 typedef struct WalReceiverConn WalReceiverConn;
 
@@ -274,6 +282,11 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn 
*conn,
 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
                                                                                
        TimeLineID *primary_tli);
 
+/*
+ * TODO
+ */
+typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn, const char 
*slots);
+
 /*
  * walrcv_server_version_fn
  *
@@ -387,6 +400,7 @@ typedef struct WalReceiverFunctionsType
        walrcv_get_conninfo_fn walrcv_get_conninfo;
        walrcv_get_senderinfo_fn walrcv_get_senderinfo;
        walrcv_identify_system_fn walrcv_identify_system;
+       walrcv_list_slots_fn walrcv_list_slots;
        walrcv_server_version_fn walrcv_server_version;
        walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
        walrcv_startstreaming_fn walrcv_startstreaming;
@@ -411,6 +425,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType 
*WalReceiverFunctions;
        WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, 
sender_port)
 #define walrcv_identify_system(conn, primary_tli) \
        WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_list_slots(conn, slots) \
+       WalReceiverFunctions->walrcv_list_slots(conn, slots)
 #define walrcv_server_version(conn) \
        WalReceiverFunctions->walrcv_server_version(conn)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 9d29849d80..5de7f80e79 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -58,7 +58,7 @@ typedef struct LogicalRepWorker
         * exits.  Under this, separate buffiles would be created for each
         * transaction which will be deleted after the transaction is finished.
         */
-       FileSet    *stream_fileset;
+       struct FileSet *stream_fileset;
 
        /* Stats. */
        XLogRecPtr      last_lsn;
@@ -81,13 +81,13 @@ extern LogicalRepWorker *MyLogicalRepWorker;
 extern bool in_remote_transaction;
 
 extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+extern LogicalRepWorker *logicalrep_worker_find(Oid dbid, Oid subid, Oid relid,
                                                                                
                bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
                                                                         Oid 
userid, Oid relid);
-extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int     logicalrep_sync_worker_count(Oid subid);
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 8785a8e12c..3274eade53 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -42,6 +42,7 @@ typedef enum
        WAIT_EVENT_CHECKPOINTER_MAIN,
        WAIT_EVENT_LOGICAL_APPLY_MAIN,
        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
+       WAIT_EVENT_REPL_SLOT_SYNC_MAIN,
        WAIT_EVENT_PGSTAT_MAIN,
        WAIT_EVENT_RECOVERY_WAL_STREAM,
        WAIT_EVENT_SYSLOGGER_MAIN,
diff --git a/src/test/recovery/t/030_slot_sync.pl 
b/src/test/recovery/t/030_slot_sync.pl
new file mode 100644
index 0000000000..c87e7dc016
--- /dev/null
+++ b/src/test/recovery/t/030_slot_sync.pl
@@ -0,0 +1,58 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 2;
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 'logical');
+$node_primary->append_conf('postgresql.conf', "standby_slot_names = 'pslot1'");
+$node_primary->start;
+$node_primary->psql('postgres', q{SELECT 
pg_create_physical_replication_slot('pslot1');});
+
+$node_primary->backup('backup');
+
+my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby');
+$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming => 
1);
+$node_phys_standby->append_conf('postgresql.conf', "synchronize_slot_names = 
'*'");
+$node_phys_standby->append_conf('postgresql.conf', "primary_slot_name = 
'pslot1'");
+$node_phys_standby->start;
+
+$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)");
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+
+$node_primary->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub1 CONNECTION '" . ($node_primary->connstr . ' 
dbname=postgres') . "' PUBLICATION pub1");
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 
's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result = $node_primary->safe_psql('postgres',
+       "SELECT slot_name, plugin, database FROM pg_replication_slots WHERE 
slot_type = 'logical'");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on primary');
+
+# FIXME: standby needs restart to pick up new slots
+$node_phys_standby->restart;
+sleep 3;
+
+$result = $node_phys_standby->safe_psql('postgres',
+       "SELECT slot_name, plugin, database FROM pg_replication_slots");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on standby');
+
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (4), (5), (6)");
+$node_primary->wait_for_catchup('sub1');
-- 
2.34.1

Reply via email to