From c41558f0bb3f9c4dc8749b0dca26c23f803a28c0 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 18 Feb 2019 18:45:56 +0900
Subject: [PATCH v11] Add copy function for replication slots

Add new user functions to copy both physical and logical replication
slots while changing properties.

We don't hold the source slot during copy. To prevent the WAL of
destination slot from removal, we copy the source slot data
first. Then install those in the new slot. The source slot could have
progressed while installing, but the installed valuses prevent global
horizons progressing further. Then copy the source slot again, and
do existance check and installing up-to-date values to the destination
slot again.
---
 contrib/test_decoding/expected/slot.out   | 234 +++++++++++++++++++
 contrib/test_decoding/sql/slot.sql        |  94 ++++++++
 doc/src/sgml/func.sgml                    |  41 ++++
 src/backend/replication/logical/logical.c |  16 +-
 src/backend/replication/slotfuncs.c       | 363 +++++++++++++++++++++++++++---
 src/backend/replication/walsender.c       |   1 +
 src/include/catalog/pg_proc.dat           |  35 +++
 src/include/replication/logical.h         |   1 +
 8 files changed, 746 insertions(+), 39 deletions(-)

diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 523621a..85cbbcd 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -150,3 +150,237 @@ SELECT pg_drop_replication_slot('regression_slot3');
  
 (1 row)
 
+--
+-- Test copy functions for logical replication slots
+--
+-- Create and copy logical slots
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary 
+------------+---------------+-----------+---------------------------------+---------------+-----------
+ orig_slot1 | test_decoding | f         | copied_slot1_change_plugin      | pgoutput      | f
+ orig_slot1 | test_decoding | f         | copied_slot1_change_plugin_temp | pgoutput      | t
+ orig_slot1 | test_decoding | f         | copied_slot1_no_change          | test_decoding | f
+(3 rows)
+
+-- Now we have maximum 4 replication slots. Check slots are properly
+-- released even when raise error during creating the target slot.
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+ERROR:  all replication slots are in use
+HINT:  Free one or increase max_replication_slots.
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+-- Test based on the temporary logical slot
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary 
+------------+---------------+-----------+---------------------------------+---------------+-----------
+ orig_slot2 | test_decoding | t         | copied_slot2_change_plugin      | pgoutput      | t
+ orig_slot2 | test_decoding | t         | copied_slot2_change_plugin_temp | pgoutput      | f
+ orig_slot2 | test_decoding | t         | copied_slot2_no_change          | test_decoding | t
+(3 rows)
+
+-- Cannot copy a logical slot to a physical slot
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+ERROR:  cannot copy a replication slot to the different type of replication slot
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+--
+-- Test copy functions for physical replication slots
+--
+-- Create and copy physical slots
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
+SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
+       slot_name        | slot_type | temporary 
+------------------------+-----------+-----------
+ orig_slot1             | physical  | f
+ orig_slot2             | physical  | f
+ copied_slot1_no_change | physical  | f
+ copied_slot1_temp      | physical  | t
+(4 rows)
+
+-- Cannot copy a physical slot to a logical slot
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+ERROR:  cannot copy a replication slot to the different type of replication slot
+-- Cannot copy a physical slot that doesn't reserve WAL
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'falied'); -- error
+ERROR:  cannot copy a replication slot that doesn't reserve WAL
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('orig_slot2');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+-- Test based on the temporary physical slot
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.temporary, c.slot_name, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name  | temporary |       slot_name        | temporary 
+------------+-----------+------------------------+-----------
+ orig_slot2 | t         | copied_slot2_no_change | t
+ orig_slot2 | t         | copied_slot2_notemp    | f
+(2 rows)
+
+SELECT pg_drop_replication_slot('orig_slot2');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot2_no_change');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot2_notemp');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index c8d08f8..87b4307 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -76,3 +76,97 @@ SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3');
 SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN
 SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error
 SELECT pg_drop_replication_slot('regression_slot3');
+
+--
+-- Test copy functions for logical replication slots
+--
+
+-- Create and copy logical slots
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput');
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Now we have maximum 4 replication slots. Check slots are properly
+-- released even when raise error during creating the target slot.
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
+
+-- Test based on the temporary logical slot
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput');
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Cannot copy a logical slot to a physical slot
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
+
+--
+-- Test copy functions for physical replication slots
+--
+
+-- Create and copy physical slots
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
+
+-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
+SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
+
+-- Cannot copy a physical slot to a logical slot
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- Cannot copy a physical slot that doesn't reserve WAL
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'falied'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+
+-- Test based on the temporary physical slot
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.temporary, c.slot_name, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot2_no_change');
+SELECT pg_drop_replication_slot('copied_slot2_notemp');
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 03859a7..52936d0 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -19516,6 +19516,47 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
       <row>
        <entry>
         <indexterm>
+         <primary>pg_copy_physical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_copy_physical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type></optional>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Copies an existing physical replication slot name <parameter>src_slot_name</parameter>
+        to a physical replication slot named <parameter>dst_slot_name</parameter>.
+        The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the
+        source slot.
+        <parameter>temporary</parameter> is optional. If <parameter>temporary</parameter>
+        is omitted, the same value as the source slot is used.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_copy_logical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_copy_logical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type> <optional>, <parameter>plugin</parameter> <type>name</type></optional></optional>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Copies an existing logical replication slot name <parameter>src_slot_name</parameter>
+        to a logical replication slot named <parameter>dst_slot_name</parameter>
+        while changing the output plugin and persistence. The copied logical slot starts
+        from the same <acronym>LSN</acronym> as the source logical slot. Both
+        <parameter>temporary</parameter> and <parameter>plugin</parameter> are optional.
+        If <parameter>temporary</parameter> or <parameter>plugin</parameter> are omitted,
+        the same values as the source logical slot are used.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
          <primary>pg_logical_slot_get_changes</primary>
         </indexterm>
         <literal><function>pg_logical_slot_get_changes(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6e5bc12..30f8482 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -221,6 +221,10 @@ StartupDecodingContext(List *output_plugin_options,
  * as the decoding context because further memory contexts will be created
  * inside it.
  *
+ * If restart_lsn is a valid value, we start decoding from the given lsn
+ * without WAL reservation routine. So the caller must guarantee that WAL
+ * is available.
+ *
  * Returns an initialized decoding context after calling the output plugin's
  * startup function.
  */
@@ -228,6 +232,7 @@ LogicalDecodingContext *
 CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
+						  XLogRecPtr restart_lsn,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
@@ -271,7 +276,14 @@ CreateInitDecodingContext(char *plugin,
 	StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
 	SpinLockRelease(&slot->mutex);
 
-	ReplicationSlotReserveWal();
+	if (XLogRecPtrIsInvalid(restart_lsn))
+		ReplicationSlotReserveWal();
+	else
+	{
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = restart_lsn;
+		SpinLockRelease(&slot->mutex);
+	}
 
 	/* ----
 	 * This is a bit tricky: We need to determine a safe xmin horizon to start
@@ -316,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
 
-	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
+	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false,
 								 read_page, prepare_write, do_write,
 								 update_progress);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 224dd92..560051d 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -17,6 +17,7 @@
 #include "miscadmin.h"
 
 #include "access/htup_details.h"
+#include "access/xlog_internal.h"
 #include "replication/decode.h"
 #include "replication/slot.h"
 #include "replication/logical.h"
@@ -36,6 +37,38 @@ check_permissions(void)
 }
 
 /*
+ * Helper function for creating a new physical replication slot with
+ * given arguments. Note that this function doesn't release the created
+ * slot.
+ *
+ * If restart_lsn is a valid value, we use it without WAL reservation
+ * routine. So the caller must guarantee that WAL is available.
+ */
+static void
+create_physical_replication_slot(char *name, bool immediately_reserve,
+								 bool temporary, XLogRecPtr restart_lsn)
+{
+	Assert(!MyReplicationSlot);
+
+	/* acquire replication slot, this will check for conflicting names */
+	ReplicationSlotCreate(name, false,
+						  temporary ? RS_TEMPORARY : RS_PERSISTENT);
+
+	if (immediately_reserve)
+	{
+		/* Reserve WAL as the user asked for it */
+		if (XLogRecPtrIsInvalid(restart_lsn))
+			ReplicationSlotReserveWal();
+		else
+			MyReplicationSlot->data.restart_lsn = restart_lsn;
+
+		/* Write this slot to disk */
+		ReplicationSlotMarkDirty();
+		ReplicationSlotSave();
+	}
+}
+
+/*
  * SQL function for creating a new physical (streaming replication)
  * replication slot.
  */
@@ -51,8 +84,6 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	HeapTuple	tuple;
 	Datum		result;
 
-	Assert(!MyReplicationSlot);
-
 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 		elog(ERROR, "return type must be a row type");
 
@@ -60,22 +91,16 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 
 	CheckSlotRequirements();
 
-	/* acquire replication slot, this will check for conflicting names */
-	ReplicationSlotCreate(NameStr(*name), false,
-						  temporary ? RS_TEMPORARY : RS_PERSISTENT);
+	create_physical_replication_slot(NameStr(*name),
+									 immediately_reserve,
+									 temporary,
+									 InvalidXLogRecPtr);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
 
 	if (immediately_reserve)
 	{
-		/* Reserve WAL as the user asked for it */
-		ReplicationSlotReserveWal();
-
-		/* Write this slot to disk */
-		ReplicationSlotMarkDirty();
-		ReplicationSlotSave();
-
 		values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
 		nulls[1] = false;
 	}
@@ -94,32 +119,18 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 
 
 /*
- * SQL function for creating a new logical replication slot.
+ * Helper function for creating a new logical replication slot with
+ * given arguments. Note that this function doesn't release the created
+ * slot.
  */
-Datum
-pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+static void
+create_logical_replication_slot(char *name, char *plugin,
+								bool temporary, XLogRecPtr restart_lsn)
 {
-	Name		name = PG_GETARG_NAME(0);
-	Name		plugin = PG_GETARG_NAME(1);
-	bool		temporary = PG_GETARG_BOOL(2);
-
 	LogicalDecodingContext *ctx = NULL;
 
-	TupleDesc	tupdesc;
-	HeapTuple	tuple;
-	Datum		result;
-	Datum		values[2];
-	bool		nulls[2];
-
 	Assert(!MyReplicationSlot);
 
-	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-		elog(ERROR, "return type must be a row type");
-
-	check_permissions();
-
-	CheckLogicalDecodingRequirements();
-
 	/*
 	 * Acquire a logical decoding slot, this will check for conflicting names.
 	 * Initially create persistent slot as ephemeral - that allows us to
@@ -128,25 +139,54 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 * slots can be created as temporary from beginning as they get dropped on
 	 * error as well.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true,
+	ReplicationSlotCreate(name, true,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
 	 */
-	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+	ctx = CreateInitDecodingContext(plugin, NIL,
 									false,	/* do not build snapshot */
+									restart_lsn,
 									logical_read_local_xlog_page, NULL, NULL,
 									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
 
-	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
-
 	/* don't need the decoding context anymore */
 	FreeDecodingContext(ctx);
+}
+
+/*
+ * SQL function for creating a new logical replication slot.
+ */
+Datum
+pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		name = PG_GETARG_NAME(0);
+	Name		plugin = PG_GETARG_NAME(1);
+	bool		temporary = PG_GETARG_BOOL(2);
+	Datum		result;
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		values[2];
+	bool		nulls[2];
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	check_permissions();
+
+	CheckLogicalDecodingRequirements();
+
+	create_logical_replication_slot(NameStr(*name),
+									NameStr(*plugin),
+									temporary,
+									InvalidXLogRecPtr);
+
+	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
 
 	memset(nulls, 0, sizeof(nulls));
 
@@ -558,3 +598,252 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 
 	PG_RETURN_DATUM(result);
 }
+
+/*
+ * Helper function of copying a replication slot.
+ */
+static Datum
+copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
+{
+	Name			src_name = PG_GETARG_NAME(0);
+	Name			dst_name = PG_GETARG_NAME(1);
+	ReplicationSlot *src = NULL;
+	XLogRecPtr		src_restart_lsn;
+	bool			src_islogical;
+	bool			temporary;
+	char			*plugin;
+	Datum			values[2];
+	bool			nulls[2];
+	Datum			result;
+	int				i;
+	TupleDesc		tupdesc;
+	HeapTuple		tuple;
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	check_permissions();
+
+	if (logical_slot)
+		CheckLogicalDecodingRequirements();
+	else
+		CheckSlotRequirements();
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	/*
+	 * To prevent the WAL of the destination slot from removal during copy,
+	 * we copy current data of the source slot first. Then install those in
+	 * the new slot. The source slot could have progressed while installing,
+	 * but the installed values prevent global horizons from progressing
+	 * further. Therefore a second copy is sufficiently up-to-date.
+	 */
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
+		{
+			SpinLockAcquire(&s->mutex);
+			src_islogical = SlotIsLogical(s);
+			src_restart_lsn = s->data.restart_lsn;
+			temporary = s->data.persistency == RS_TEMPORARY;
+			plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
+			SpinLockRelease(&s->mutex);
+
+			src = s;
+			break;
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+
+	if (src == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
+
+	/* Check type of replication slot */
+	if (src_islogical != logical_slot)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 (errmsg("cannot copy a replication slot to the different type of replication slot"))));
+
+	/* Copying non-reserved slot doesn't make sense */
+	if (XLogRecPtrIsInvalid(src_restart_lsn))
+	{
+		Assert(!logical_slot);
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 (errmsg("cannot copy a replication slot that doesn't reserve WAL"))));
+	}
+
+	/* Overwrite by optional arguments */
+	if (PG_NARGS() >= 3)
+		temporary = PG_GETARG_BOOL(2);
+	if (PG_NARGS() >= 4)
+	{
+		Assert(logical_slot);
+		plugin = NameStr(*(PG_GETARG_NAME(3)));
+	}
+
+	/* Create new slot and acquire it */
+	if (logical_slot)
+		create_logical_replication_slot(NameStr(*dst_name),
+										plugin,
+										temporary,
+										src_restart_lsn);
+	else
+		create_physical_replication_slot(NameStr(*dst_name),
+										 true,
+										 temporary,
+										 src_restart_lsn);
+
+	/*
+	 * Update the destination slot with the second copy of the source slot
+	 * to reserve WAL.
+	 */
+	{
+		TransactionId	copy_effective_xmin;
+		TransactionId	copy_effective_catalog_xmin;
+		TransactionId	copy_xmin;
+		TransactionId	copy_catalog_xmin;
+		XLogRecPtr		copy_restart_lsn;
+		bool			copy_islogical;
+		char			*copy_name;
+
+		/* Copy data of source slot again */
+		SpinLockAcquire(&src->mutex);
+		copy_effective_xmin = src->effective_xmin;
+		copy_effective_catalog_xmin = src->effective_catalog_xmin;
+
+		copy_xmin = src->data.xmin;
+		copy_catalog_xmin = src->data.catalog_xmin;
+		copy_restart_lsn = src->data.restart_lsn;
+
+		/* for existence check */
+		copy_name = pstrdup(NameStr(src->data.name));
+		copy_islogical = SlotIsLogical(src);
+		SpinLockRelease(&src->mutex);
+
+		/*
+		 * Check if the source slot still exists and is valid. We regards it as
+		 * invalid if the type of replication slot or name has been changed, or
+		 * the restart_lsn either is invalid or have gone backward. The restart_lsn
+		 * of second copy can go backward when the source slot is dropped and
+		 * copied form another old slot during installation. Since erroring out will
+		 * release and drop the destination slot we don't need to release it here.
+		 */
+		if (copy_restart_lsn < src_restart_lsn ||
+			src_islogical != copy_islogical ||
+			strcmp(copy_name, NameStr(*src_name)) != 0)
+			ereport(ERROR,
+					(errmsg("could not copy logical replication slot \"%s\"",
+							NameStr(*src_name)),
+					 errdetail("The source replication slot has been dropped during copy")));
+
+		/* Install copied values again */
+		SpinLockAcquire(&MyReplicationSlot->mutex);
+		MyReplicationSlot->effective_xmin = copy_effective_xmin;
+		MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
+
+		MyReplicationSlot->data.xmin = copy_xmin;
+		MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
+		MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+		SpinLockRelease(&MyReplicationSlot->mutex);
+
+		ReplicationSlotMarkDirty();
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+		ReplicationSlotSave();
+
+		/* Check if the restart_lsn is available */
+#ifdef USE_ASSERT_CHECKING
+		{
+			XLogSegNo	segno;
+
+			XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
+			Assert(XLogGetLastRemovedSegno() < segno);
+		}
+#endif
+	}
+
+	/* The destination slot is now fully created, mark it as persistent if needed */
+	if (logical_slot && !temporary)
+		ReplicationSlotPersist();
+
+	values[0] = NameGetDatum(dst_name);
+	nulls[0] = false;
+
+	if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
+	{
+		values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
+		nulls[1] = false;
+	}
+	else
+		nulls[1] = true;
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	result = HeapTupleGetDatum(tuple);
+
+	ReplicationSlotRelease();
+
+	PG_RETURN_DATUM(result);
+}
+
+/*
+ * Copy logical replication slot (2 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_logical_replication_slot_no_plugin_temp(PG_FUNCTION_ARGS)
+{
+	return copy_replication_slot(fcinfo, true);
+}
+
+/*
+ * Copy logical replication slot (3 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_logical_replication_slot_no_plugin(PG_FUNCTION_ARGS)
+{
+	return copy_replication_slot(fcinfo, true);
+}
+
+/*
+ * SQL function for copying a logical replication slot.
+ */
+Datum
+pg_copy_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+	return copy_replication_slot(fcinfo, true);
+}
+
+/*
+ * Copy physical replication slot (3 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_physical_replication_slot_no_temp(PG_FUNCTION_ARGS)
+{
+	return copy_replication_slot(fcinfo, false);
+}
+
+/*
+ * SQL function for copying a physical replication slot.
+ */
+Datum
+pg_copy_physical_replication_slot(PG_FUNCTION_ARGS)
+{
+	return copy_replication_slot(fcinfo, false);
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9b143f3..03c61f3 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -930,6 +930,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		}
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+										InvalidXLogRecPtr,
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData,
 										WalSndUpdateProgress);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 2e9d6cf..3895d71 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9676,6 +9676,20 @@
   proargmodes => '{i,i,i,o,o}',
   proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}',
   prosrc => 'pg_create_physical_replication_slot' },
+{ oid => '4008', descr => 'copy a physical replication slot while changing temporality',
+  proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+  proallargtypes => '{name,name,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+  prosrc => 'pg_copy_physical_replication_slot' },
+{ oid => '4009', descr => 'copy a physical replication slot',
+  proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+  proallargtypes => '{name,name,name,pg_lsn}',
+  proargmodes => '{i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_physical_replication_slot_no_temp' },
 { oid => '3780', descr => 'drop a replication slot',
   proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u',
   prorettype => 'void', proargtypes => 'name',
@@ -9696,6 +9710,27 @@
   proargmodes => '{i,i,i,o,o}',
   proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
+{ oid => '4005', descr => 'copy a logical replication slot while changing temporality and plugin',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool name',
+  proallargtypes => '{name,name,bool,name,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,temporary,plugin,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot' },
+{ oid => '4006', descr => 'copy a logical replication slot while changing temporality',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+  proallargtypes => '{name,name,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_no_plugin' },
+{ oid => '4007', descr => 'copy a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+  proallargtypes => '{name,name,name,pg_lsn}',
+  proargmodes => '{i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_no_plugin_temp' },
 { oid => '3782', descr => 'get changes from replication slot',
   proname => 'pg_logical_slot_get_changes', procost => '1000',
   prorows => '1000', provariadic => 'text', proisstrict => 'f',
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c8ffc4c..0a2a63a 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void);
 extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
+						  XLogRecPtr restart_lsn,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-- 
2.10.5

