From f895ec3417a9ee4c2f58e31fa8fc33c48dbf534e Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 26 Jun 2018 21:51:50 +0900
Subject: [PATCH v3] Copy physical and logical replication slot.

---
 doc/src/sgml/func.sgml                      |  41 ++++
 src/backend/replication/logical/logical.c   |  13 +-
 src/backend/replication/slotfuncs.c         | 308 +++++++++++++++++++++++-----
 src/backend/replication/walsender.c         |   1 +
 src/include/catalog/pg_proc.dat             |  35 ++++
 src/include/replication/logical.h           |   1 +
 src/test/recovery/t/006_logical_decoding.pl |  13 +-
 7 files changed, 360 insertions(+), 52 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index edc9be9..5a5369a 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -19252,6 +19252,47 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
       <row>
        <entry>
         <indexterm>
+         <primary>pg_copy_physical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_copy_physical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <optional>, <parameter>temporary</parameter> <type>bool</type></optional>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Copy a existing <parameter>src_slot_name</parameter> physical slot
+        to <parameter>dst_slot_name</parameter>. The copied physical slot starts
+        to reserve WAL from the same LSN as the source slot if the source slot
+        already reserves WAL. <parameter>temporary</parameter> is optional. If
+        <parameter>temporary</parameter> is omitted, the same value of the
+        source slot is used.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_copy_logical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_copy_logical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <optional>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type></optional></optional>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Copy a existing <parameter>src_slot_name</parameter> logical (decoding) slot
+        to <parameter>dst_slot_name</parameter> while changing the output plugin
+        and persistence. The copied logical slot starts from the same LSN as the
+        source logical slot. Both <parameter>plugin</parameter> and
+        <parameter>temporary</parameter> are optional. If <parameter>plugin</parameter>
+        or <parameter>temporary</parameter> are omitted, the same values of
+        the source logical slot are used.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
          <primary>pg_logical_slot_get_changes</primary>
         </indexterm>
         <literal><function>pg_logical_slot_get_changes(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 61588d6..7e7d54a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -223,6 +223,7 @@ LogicalDecodingContext *
 CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
+						  XLogRecPtr start_lsn,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
@@ -266,7 +267,15 @@ CreateInitDecodingContext(char *plugin,
 	StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
 	SpinLockRelease(&slot->mutex);
 
-	ReplicationSlotReserveWal();
+	/* Find start location to read WAL if not specified */
+	if (XLogRecPtrIsInvalid(start_lsn))
+		ReplicationSlotReserveWal();
+	else
+	{
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = start_lsn;
+		SpinLockRelease(&slot->mutex);
+	}
 
 	/* ----
 	 * This is a bit tricky: We need to determine a safe xmin horizon to start
@@ -311,7 +320,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
 
-	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
+	ctx = StartupDecodingContext(NIL, start_lsn, xmin_horizon,
 								 need_full_snapshot, true,
 								 read_page, prepare_write, do_write,
 								 update_progress);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2806e10..baab35e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -36,86 +36,181 @@ check_permissions(void)
 }
 
 /*
- * SQL function for creating a new physical (streaming replication)
- * replication slot.
+ * Helper function for creating a new physical replication slot with
+ * given arguments. Return a restart_lsn of new replication slot or
+ * InvalidXLogRecPtr if WAL reservation is not required.
  */
-Datum
-pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
+static XLogRecPtr
+create_physical_replication_slot(char *name, bool immediately_reserve,
+								 bool temporary, XLogRecPtr start_lsn)
 {
-	Name		name = PG_GETARG_NAME(0);
-	bool		immediately_reserve = PG_GETARG_BOOL(1);
-	bool		temporary = PG_GETARG_BOOL(2);
-	Datum		values[2];
-	bool		nulls[2];
-	TupleDesc	tupdesc;
-	HeapTuple	tuple;
-	Datum		result;
+	XLogRecPtr	result = InvalidXLogRecPtr;
 
 	Assert(!MyReplicationSlot);
 
-	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-		elog(ERROR, "return type must be a row type");
-
 	check_permissions();
 
 	CheckSlotRequirements();
 
 	/* acquire replication slot, this will check for conflicting names */
-	ReplicationSlotCreate(NameStr(*name), false,
+	ReplicationSlotCreate(name, false,
 						  temporary ? RS_TEMPORARY : RS_PERSISTENT);
 
-	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-	nulls[0] = false;
-
 	if (immediately_reserve)
 	{
 		/* Reserve WAL as the user asked for it */
-		ReplicationSlotReserveWal();
+		if (XLogRecPtrIsInvalid(start_lsn))
+			ReplicationSlotReserveWal();
+		else
+			MyReplicationSlot->data.restart_lsn = start_lsn;
 
 		/* Write this slot to disk */
 		ReplicationSlotMarkDirty();
 		ReplicationSlotSave();
 
-		values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
-		nulls[1] = false;
+		result = MyReplicationSlot->data.restart_lsn;
 	}
+
+	ReplicationSlotRelease();
+
+	return result;
+}
+
+/*
+ * SQL function for creating a new physical (streaming replication)
+ * replication slot.
+ */
+Datum
+pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		name = PG_GETARG_NAME(0);
+	bool		immediately_reserve = PG_GETARG_BOOL(1);
+	bool		temporary = PG_GETARG_BOOL(2);
+	Datum		values[2];
+	bool		nulls[2];
+	XLogRecPtr	result_lsn;
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		result;
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	result_lsn = create_physical_replication_slot(NameStr(*name),
+												  immediately_reserve,
+												  temporary,
+												  InvalidXLogRecPtr);
+
+	values[0] = NameGetDatum(name);
+	nulls[0] = false;
+
+	if (XLogRecPtrIsInvalid(result_lsn))
+		nulls[1] = true;
 	else
 	{
-		nulls[1] = true;
+		values[1] = LSNGetDatum(result_lsn);
+		nulls[1] = false;
 	}
 
 	tuple = heap_form_tuple(tupdesc, values, nulls);
 	result = HeapTupleGetDatum(tuple);
 
-	ReplicationSlotRelease();
-
 	PG_RETURN_DATUM(result);
 }
 
-
 /*
- * SQL function for creating a new logical replication slot.
+ * Copy physical replication slot (3 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
  */
 Datum
-pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+pg_copy_physical_replication_slot_no_temp(PG_FUNCTION_ARGS)
 {
-	Name		name = PG_GETARG_NAME(0);
-	Name		plugin = PG_GETARG_NAME(1);
-	bool		temporary = PG_GETARG_BOOL(2);
-
-	LogicalDecodingContext *ctx = NULL;
+	return pg_copy_physical_replication_slot(fcinfo);
+}
 
-	TupleDesc	tupdesc;
-	HeapTuple	tuple;
-	Datum		result;
+/*
+ * SQL function for copying a physical replication slot.
+ */
+Datum
+pg_copy_physical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		src_name = PG_GETARG_NAME(0);
+	Name		dst_name = PG_GETARG_NAME(1);
+	bool		temporary; /* optional argument */
+	bool		immediately_reserve;
+	XLogRecPtr	start_lsn;
+	XLogRecPtr	result_lsn;
 	Datum		values[2];
 	bool		nulls[2];
-
-	Assert(!MyReplicationSlot);
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
 
 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 		elog(ERROR, "return type must be a row type");
 
+	/* Acquire the source slot so we own it */
+	ReplicationSlotAcquire(NameStr(*src_name), true);
+
+	/* Check type of replication slot */
+	if (SlotIsLogical(MyReplicationSlot))
+	{
+		ReplicationSlotRelease();
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 (errmsg("cannot copy logical replication slot to physical replication slot"))));
+	}
+
+	/* Save some values of the source slot before releasing */
+	start_lsn = MyReplicationSlot->data.restart_lsn;
+	temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY);
+
+	/* Release it */
+	ReplicationSlotRelease();
+
+	/* check the optional argument */
+	if (PG_NARGS() >= 3)
+		temporary = PG_GETARG_BOOL(2);
+
+	/* Reserve WAL at creation if the source slots already reserves */
+	immediately_reserve = !XLogRecPtrIsInvalid(start_lsn);
+
+	result_lsn = create_physical_replication_slot(NameStr(*dst_name),
+												  immediately_reserve,
+												  temporary,
+												  start_lsn);
+
+	values[0] = NameGetDatum(dst_name);
+	nulls[0] = false;
+
+	if (XLogRecPtrIsInvalid(result_lsn))
+		nulls[1] = true;
+	else
+	{
+		values[1] = LSNGetDatum(result_lsn);
+		nulls[1] = false;
+	}
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * Helper function for creating a new logical replication slot with
+ * given arguments. Return a confirmed_lsn of new replication slot.
+ */
+static XLogRecPtr
+create_logical_replication_slot(char *name, char *plugin,
+								bool temporary, XLogRecPtr start_lsn)
+{
+	LogicalDecodingContext *ctx = NULL;
+	XLogRecPtr	result;
+
+	Assert(!MyReplicationSlot);
+
 	check_permissions();
 
 	CheckLogicalDecodingRequirements();
@@ -128,39 +223,154 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 * slots can be created as temporary from beginning as they get dropped on
 	 * error as well.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true,
+	ReplicationSlotCreate(name, true,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
 	 */
-	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+	ctx = CreateInitDecodingContext(plugin, NIL,
 									false,	/* do not build snapshot */
+									start_lsn,
 									logical_read_local_xlog_page, NULL, NULL,
 									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
 
-	values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
-	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
-
 	/* don't need the decoding context anymore */
 	FreeDecodingContext(ctx);
 
-	memset(nulls, 0, sizeof(nulls));
-
-	tuple = heap_form_tuple(tupdesc, values, nulls);
-	result = HeapTupleGetDatum(tuple);
-
 	/* ok, slot is now fully created, mark it as persistent if needed */
 	if (!temporary)
 		ReplicationSlotPersist();
+
+	result = MyReplicationSlot->data.confirmed_flush;
+
 	ReplicationSlotRelease();
 
-	PG_RETURN_DATUM(result);
+	return result;
+}
+
+/*
+ * SQL function for creating a new logical replication slot.
+ */
+Datum
+pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		name = PG_GETARG_NAME(0);
+	Name		plugin = PG_GETARG_NAME(1);
+	bool		temporary = PG_GETARG_BOOL(2);
+	XLogRecPtr	confirmed_flush;
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		values[2];
+	bool		nulls[2];
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	confirmed_flush = create_logical_replication_slot(NameStr(*name),
+													  NameStr(*plugin),
+													  temporary,
+													  InvalidXLogRecPtr);
+
+	memset(nulls, 0, sizeof(nulls));
+
+	values[0] = CStringGetTextDatum(NameStr(*name));
+	values[1] = LSNGetDatum(confirmed_flush);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_POINTER(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * Copy logical replication slot (2 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_logical_replication_slot_no_plugin_temp(PG_FUNCTION_ARGS)
+{
+	return pg_copy_logical_replication_slot(fcinfo);
 }
 
+/*
+ * Copy logical replication slot (3 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_logical_replication_slot_no_plugin(PG_FUNCTION_ARGS)
+{
+	return pg_copy_logical_replication_slot(fcinfo);
+}
+
+/*
+ * SQL function for copying a logical replication slot.
+ */
+Datum
+pg_copy_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		src_name = PG_GETARG_NAME(0);
+	Name		dst_name = PG_GETARG_NAME(1);
+	char		*plugin;	/* optional argument */
+	bool		temporary;	/* optional argument */
+	XLogRecPtr	confirmed_flush;
+	XLogRecPtr	start_lsn;
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		values[2];
+	bool		nulls[2];
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	/* Acquire the source slot so we own it */
+	ReplicationSlotAcquire(NameStr(*src_name), true);
+
+	/* Check type of replication slot */
+	if (!SlotIsLogical(MyReplicationSlot))
+	{
+		ReplicationSlotRelease();
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 (errmsg("cannot copy physical replication slot to logical replication slot"))));
+	}
+
+	/* Save some values of the source slot before releasing */
+	start_lsn = MyReplicationSlot->data.restart_lsn;
+	plugin = pstrdup(NameStr(MyReplicationSlot->data.plugin));
+	temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY);
+
+	/* Release it */
+	ReplicationSlotRelease();
+
+	/* Check the optional arguments */
+	if (PG_NARGS() >= 3)
+		plugin = NameStr(*(PG_GETARG_NAME(2)));
+	if (PG_NARGS() >= 4)
+		temporary = PG_GETARG_BOOL(3);
+
+	confirmed_flush = create_logical_replication_slot(NameStr(*dst_name),
+													  plugin,
+													  temporary,
+													  start_lsn);
+
+	memset(nulls, 0, sizeof(nulls));
+
+	values[0] = CStringGetTextDatum(NameStr(*dst_name));
+	values[1] = LSNGetDatum(confirmed_flush);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_POINTER(HeapTupleGetDatum(tuple));
+}
 
 /*
  * SQL function for dropping a replication slot.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca..c49f2d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -918,6 +918,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		}
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+										InvalidXLogRecPtr,
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData,
 										WalSndUpdateProgress);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 40d54ed..499b0da 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9787,6 +9787,20 @@
   proargmodes => '{i,i,i,o,o}',
   proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}',
   prosrc => 'pg_create_physical_replication_slot' },
+{ oid => '4008', descr => 'copy a physical replication slot',
+  proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+  proallargtypes => '{name,name,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+  prosrc => 'pg_copy_physical_replication_slot' },
+{ oid => '4009', descr => 'copy a physical replication slot',
+  proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+  proallargtypes => '{name,name,name,pg_lsn}',
+  proargmodes => '{i,i,o,o}',
+  proargnames => '{slot_name,dst_name,slot_name,lsn}',
+  prosrc => 'pg_copy_physical_replication_slot_no_temp' },
 { oid => '3780', descr => 'drop a replication slot',
   proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u',
   prorettype => 'void', proargtypes => 'name',
@@ -9807,6 +9821,27 @@
   proargmodes => '{i,i,i,o,o}',
   proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
+{ oid => '4005', descr => 'copy up a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name name bool',
+  proallargtypes => '{name,name,name,bool,text,pg_lsn}',
+  proargmodes => '{i,i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,plugin,temporary,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot' },
+{ oid => '4006', descr => 'copy up a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name name',
+  proallargtypes => '{name,name,name,text,pg_lsn}',
+  proargmodes => '{i,i,i,o,o}',
+  proargnames => '{src_slot_name,plugin,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_no_plugin' },
+{ oid => '4007', descr => 'copy up a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+  proallargtypes => '{name,name,text,pg_lsn}',
+  proargmodes => '{i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_no_plugin_temp' },
 { oid => '3782', descr => 'get changes from replication slot',
   proname => 'pg_logical_slot_get_changes', procost => '1000',
   prorows => '1000', provariadic => 'text', proisstrict => 'f',
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c25ac1f..d1643e5 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void);
 extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
+						  XLogRecPtr start_lsn,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index e3a5fe9..8dc91e2 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 17;
 use Config;
 
 # Initialize master node
@@ -27,6 +27,11 @@ $node_master->safe_psql('postgres',
 	qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]
 );
 
+# Copy logical slot
+$node_master->safe_psql('postgres',
+	qq[SELECT pg_copy_logical_replication_slot('test_slot', 'copy_slot', 'test_decoding', false);]
+);
+
 $node_master->safe_psql('postgres',
 	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
 );
@@ -37,6 +42,12 @@ my ($result) = $node_master->safe_psql('postgres',
 is(scalar(my @foobar = split /^/m, $result),
 	12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
 
+# Basic decoding works using the copied slot
+$result = $node_master->safe_psql('postgres',
+	qq[SELECT pg_logical_slot_get_changes('copy_slot', NULL, NULL);]);
+is(scalar(@foobar = split /^/m, $result),
+	12, 'Decoding produced 12 rows inc BEGIN/COMMIT using the copied slot');
+
 # If we immediately crash the server we might lose the progress we just made
 # and replay the same changes again. But a clean shutdown should never repeat
 # the same changes when we use the SQL decoding interface.
-- 
2.10.5

