From 2c2663fab675ceb80ea75ad82ddd5b1437d67711 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 28 Aug 2018 16:14:32 +0900
Subject: [PATCH v7] Copy function for logical and physical replication slots.

---
 contrib/test_decoding/expected/slot.out   | 234 ++++++++++++++++++
 contrib/test_decoding/sql/slot.sql        |  94 ++++++++
 doc/src/sgml/func.sgml                    |  41 ++++
 src/backend/replication/logical/logical.c |   5 +-
 src/backend/replication/slot.c            |  89 ++++---
 src/backend/replication/slotfuncs.c       | 380 ++++++++++++++++++++++++++----
 src/backend/replication/walsender.c       |   3 +-
 src/include/catalog/pg_proc.dat           |  35 +++
 src/include/replication/logical.h         |   1 +
 src/include/replication/slot.h            |   2 +-
 10 files changed, 800 insertions(+), 84 deletions(-)

diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 523621a..40b200e 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -150,3 +150,237 @@ SELECT pg_drop_replication_slot('regression_slot3');
  
 (1 row)
 
+--
+-- Test copy functions for logical replication slots
+--
+-- Create and copy logical slots
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', 'pgoutput', true);
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary 
+------------+---------------+-----------+---------------------------------+---------------+-----------
+ orig_slot1 | test_decoding | f         | copied_slot1_change_plugin      | pgoutput      | f
+ orig_slot1 | test_decoding | f         | copied_slot1_change_plugin_temp | pgoutput      | t
+ orig_slot1 | test_decoding | f         | copied_slot1_no_change          | test_decoding | f
+(3 rows)
+
+-- Now we have maximum 4 replication slots. Check slots are properly
+-- released even when raise error during creating the target slot.
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+ERROR:  all replication slots are in use
+HINT:  Free one or increase max_replication_slots.
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+-- Test based on the temporary logical slot
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', 'pgoutput');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', 'pgoutput', false);
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary 
+------------+---------------+-----------+---------------------------------+---------------+-----------
+ orig_slot2 | test_decoding | t         | copied_slot2_change_plugin      | pgoutput      | t
+ orig_slot2 | test_decoding | t         | copied_slot2_change_plugin_temp | pgoutput      | f
+ orig_slot2 | test_decoding | t         | copied_slot2_no_change          | test_decoding | t
+(3 rows)
+
+-- Cannot copy a logical slot to a physical slot
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+ERROR:  cannot copy a logical replication slot to a physical replication slot
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+--
+-- Test copy functions for physical replication slots
+--
+-- Create and copy physical slots
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
+SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
+       slot_name        | slot_type | temporary 
+------------------------+-----------+-----------
+ orig_slot1             | physical  | f
+ orig_slot2             | physical  | f
+ copied_slot1_no_change | physical  | f
+ copied_slot1_temp      | physical  | t
+(4 rows)
+
+-- Cannot copy a physical slot to a logical slot
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+ERROR:  cannot copy a physical replication slot to a logical replication slot
+-- Cannot copy a physical slot that doesn't reserve WAL
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'falied'); -- error
+ERROR:  cannot copy a physical replication slot that doesn't reserve WAL
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('orig_slot2');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+-- Test based on the temporary physical slot
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
+ ?column? 
+----------
+ copy
+(1 row)
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.temporary, c.slot_name, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+ slot_name  | temporary |       slot_name        | temporary 
+------------+-----------+------------------------+-----------
+ orig_slot2 | t         | copied_slot2_no_change | t
+ orig_slot2 | t         | copied_slot2_notemp    | f
+(2 rows)
+
+SELECT pg_drop_replication_slot('orig_slot2');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot2_no_change');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('copied_slot2_notemp');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index c8d08f8..c14937c 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -76,3 +76,97 @@ SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3');
 SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN
 SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error
 SELECT pg_drop_replication_slot('regression_slot3');
+
+--
+-- Test copy functions for logical replication slots
+--
+
+-- Create and copy logical slots
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', 'pgoutput', true);
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Now we have maximum 4 replication slots. Check slots are properly
+-- released even when raise error during creating the target slot.
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
+
+-- Test based on the temporary logical slot
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', 'pgoutput', false);
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Cannot copy a logical slot to a physical slot
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
+
+--
+-- Test copy functions for physical replication slots
+--
+
+-- Create and copy physical slots
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
+
+-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
+SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
+
+-- Cannot copy a physical slot to a logical slot
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- Cannot copy a physical slot that doesn't reserve WAL
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'falied'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+
+-- Test based on the temporary physical slot
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
+
+-- Check all copied slots status
+SELECT
+    o.slot_name, o.temporary, c.slot_name, c.temporary
+FROM
+    (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+    LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
+WHERE
+    o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot2_no_change');
+SELECT pg_drop_replication_slot('copied_slot2_notemp');
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 09c77db..29c7347 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -19512,6 +19512,47 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
       <row>
        <entry>
         <indexterm>
+         <primary>pg_copy_physical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_copy_physical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <optional>, <parameter>temporary</parameter> <type>bool</type></optional>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Copies an existing physical replication slot name <parameter>src_slot_name</parameter>
+        to a physical replication slot named <parameter>dst_slot_name</parameter>.
+        The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the
+        source slot.
+        <parameter>temporary</parameter> is optional. If <parameter>temporary</parameter>
+        is omitted, the same value as the source slot is used.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_copy_logical_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_copy_logical_replication_slot(<parameter>src_slot_name</parameter> <type>name</type>, <parameter>dst_slot_name</parameter> <optional>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type></optional></optional>)</function></literal>
+       </entry>
+       <entry>
+        (<parameter>slot_name</parameter> <type>name</type>, <parameter>lsn</parameter> <type>pg_lsn</type>)
+       </entry>
+       <entry>
+        Copies an existing logical replication slot name <parameter>src_slot_name</parameter>
+        to a logical replication slot named <parameter>dst_slot_name</parameter>
+        while changing the output plugin and persistence. The copied logical slot starts
+        from the same <acronym>LSN</acronym> as the source logical slot. Both <parameter>plugin</parameter> and
+        <parameter>temporary</parameter> are optional. If <parameter>plugin</parameter>
+        or <parameter>temporary</parameter> are omitted, the same values as
+        the source logical slot are used.
+       </entry>
+      </row>
+
+      <row>
+       <entry>
+        <indexterm>
          <primary>pg_logical_slot_get_changes</primary>
         </indexterm>
         <literal><function>pg_logical_slot_get_changes(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 9f99e4f..642b379 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -228,6 +228,7 @@ LogicalDecodingContext *
 CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
+						  XLogRecPtr restart_lsn,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
@@ -271,7 +272,7 @@ CreateInitDecodingContext(char *plugin,
 	StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
 	SpinLockRelease(&slot->mutex);
 
-	ReplicationSlotReserveWal();
+	ReplicationSlotReserveWal(restart_lsn);
 
 	/* ----
 	 * This is a bit tricky: We need to determine a safe xmin horizon to start
@@ -316,7 +317,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
 
-	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
+	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false,
 								 read_page, prepare_write, do_write,
 								 update_progress);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1f2e713..1015482 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -990,11 +990,13 @@ CheckSlotRequirements(void)
 /*
  * Reserve WAL for the currently active slot.
  *
- * Compute and set restart_lsn in a manner that's appropriate for the type of
- * the slot and concurrency safe.
+ * If an lsn to reserve is not requested, compute and set restart_lsn
+ * in a manner that's appropriate for the type of the slot and concurrency safe.
+ * If the reseved WAL is requested, set restart_lsn and check if the corresponding
+ * wal segment is available.
  */
 void
-ReplicationSlotReserveWal(void)
+ReplicationSlotReserveWal(XLogRecPtr requested_lsn)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 
@@ -1005,47 +1007,57 @@ ReplicationSlotReserveWal(void)
 	 * The replication slot mechanism is used to prevent removal of required
 	 * WAL. As there is no interlock between this routine and checkpoints, WAL
 	 * segments could concurrently be removed when a now stale return value of
-	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
-	 * this happens we'll just retry.
+	 * ReplicationSlotsComputeRequiredLSN() is used. If the lsn to reserve is
+	 * not requested, in the unlikely case that this happens we'll just retry.
 	 */
 	while (true)
 	{
 		XLogSegNo	segno;
 		XLogRecPtr	restart_lsn;
 
-		/*
-		 * For logical slots log a standby snapshot and start logical decoding
-		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
-		 *
-		 * That's not needed (or indeed helpful) for physical slots as they'll
-		 * start replay at the last logged checkpoint anyway. Instead return
-		 * the location of the last redo LSN. While that slightly increases
-		 * the chance that we have to retry, it's where a base backup has to
-		 * start replay at.
-		 */
-		if (!RecoveryInProgress() && SlotIsLogical(slot))
+		if (!XLogRecPtrIsInvalid(requested_lsn))
 		{
-			XLogRecPtr	flushptr;
-
-			/* start at current insert position */
-			restart_lsn = GetXLogInsertRecPtr();
+			/* Set the requested lsn */
 			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
+			slot->data.restart_lsn = requested_lsn;
 			SpinLockRelease(&slot->mutex);
-
-			/* make sure we have enough information to start */
-			flushptr = LogStandbySnapshot();
-
-			/* and make sure it's fsynced to disk */
-			XLogFlush(flushptr);
 		}
 		else
 		{
-			restart_lsn = GetRedoRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
+			/*
+			 * For logical slots log a standby snapshot and start logical decoding
+			 * at exactly that position. That allows the slot to start up more
+			 * quickly.
+			 *
+			 * That's not needed (or indeed helpful) for physical slots as they'll
+			 * start replay at the last logged checkpoint anyway. Instead return
+			 * the location of the last redo LSN. While that slightly increases
+			 * the chance that we have to retry, it's where a base backup has to
+			 * start replay at.
+			 */
+			if (!RecoveryInProgress() && SlotIsLogical(slot))
+			{
+				XLogRecPtr	flushptr;
+
+				/* start at current insert position */
+				restart_lsn = GetXLogInsertRecPtr();
+				SpinLockAcquire(&slot->mutex);
+				slot->data.restart_lsn = restart_lsn;
+				SpinLockRelease(&slot->mutex);
+
+				/* make sure we have enough information to start */
+				flushptr = LogStandbySnapshot();
+
+				/* and make sure it's fsynced to disk */
+				XLogFlush(flushptr);
+			}
+			else
+			{
+				restart_lsn = GetRedoRecPtr();
+				SpinLockAcquire(&slot->mutex);
+				slot->data.restart_lsn = restart_lsn;
+				SpinLockRelease(&slot->mutex);
+			}
 		}
 
 		/* prevent WAL removal as fast as possible */
@@ -1061,6 +1073,19 @@ ReplicationSlotReserveWal(void)
 		XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
 		if (XLogGetLastRemovedSegno() < segno)
 			break;
+
+		/*
+		 * The requested wal lsn is no longer available. We don't want to retry
+		 * it, so raise an error.
+		 */
+		if (!XLogRecPtrIsInvalid(requested_lsn))
+		{
+			char filename[MAXFNAMELEN];
+
+			XLogFileName(filename, ThisTimeLineID, segno, wal_segment_size);
+			ereport(ERROR,
+					(errmsg("could not reserve WAL segment %s", filename)));
+		}
 	}
 }
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8782bad..0683dab 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -17,10 +17,12 @@
 #include "miscadmin.h"
 
 #include "access/htup_details.h"
+#include "access/xlog_internal.h"
 #include "replication/decode.h"
 #include "replication/slot.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
+#include "storage/ipc.h"
 #include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/pg_lsn.h"
@@ -36,6 +38,66 @@ check_permissions(void)
 }
 
 /*
+ * Error cleanup callback for copy replication slot functions. Release
+ * both MyReplicationSlot and the saved replication slot.
+ */
+static void
+copy_replication_slot_callback(int code, Datum arg)
+{
+	ReplicationSlot	*savedslot = (ReplicationSlot *) DatumGetPointer(arg);
+	bool	release_saved_slot = (savedslot && savedslot != MyReplicationSlot);
+
+	if (MyReplicationSlot)
+		ReplicationSlotRelease();
+
+	/* Release the saved slot if exist while preventing double releasing */
+	if (release_saved_slot)
+	{
+		Assert(MyReplicationSlot == NULL);
+		MyReplicationSlot = savedslot;
+		ReplicationSlotRelease();
+	}
+}
+
+/*
+ * Helper function for creating a new physical replication slot with
+ * given arguments. Return a restart_lsn of new replication slot or
+ * InvalidXLogRecPtr if WAL reservation is not required.
+ */
+static XLogRecPtr
+create_physical_replication_slot(char *name, bool immediately_reserve,
+								 bool temporary, XLogRecPtr restart_lsn)
+{
+	XLogRecPtr	result = InvalidXLogRecPtr;
+
+	Assert(!MyReplicationSlot);
+
+	check_permissions();
+
+	CheckSlotRequirements();
+
+	/* acquire replication slot, this will check for conflicting names */
+	ReplicationSlotCreate(name, false,
+						  temporary ? RS_TEMPORARY : RS_PERSISTENT);
+
+	if (immediately_reserve)
+	{
+		/* Reserve WAL as the user asked for it */
+		ReplicationSlotReserveWal(restart_lsn);
+
+		/* Write this slot to disk */
+		ReplicationSlotMarkDirty();
+		ReplicationSlotSave();
+
+		result = MyReplicationSlot->data.restart_lsn;
+	}
+
+	ReplicationSlotRelease();
+
+	return result;
+}
+
+/*
  * SQL function for creating a new physical (streaming replication)
  * replication slot.
  */
@@ -47,75 +109,162 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	bool		temporary = PG_GETARG_BOOL(2);
 	Datum		values[2];
 	bool		nulls[2];
+	XLogRecPtr	result_lsn;
 	TupleDesc	tupdesc;
 	HeapTuple	tuple;
 	Datum		result;
 
-	Assert(!MyReplicationSlot);
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	result_lsn = create_physical_replication_slot(NameStr(*name),
+												  immediately_reserve,
+												  temporary,
+												  InvalidXLogRecPtr);
+
+	values[0] = NameGetDatum(name);
+	nulls[0] = false;
+
+	if (XLogRecPtrIsInvalid(result_lsn))
+		nulls[1] = true;
+	else
+	{
+		values[1] = LSNGetDatum(result_lsn);
+		nulls[1] = false;
+	}
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	result = HeapTupleGetDatum(tuple);
+
+	PG_RETURN_DATUM(result);
+}
+
+/*
+ * Copy physical replication slot (3 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_physical_replication_slot_no_temp(PG_FUNCTION_ARGS)
+{
+	return pg_copy_physical_replication_slot(fcinfo);
+}
+
+/*
+ * SQL function for copying a physical replication slot.
+ */
+Datum
+pg_copy_physical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		src_name = PG_GETARG_NAME(0);
+	Name		dst_name = PG_GETARG_NAME(1);
+	bool		temporary; /* optional argument */
+	bool		immediately_reserve;
+	ReplicationSlot	*saveslot = NULL;
+	XLogRecPtr	restart_lsn;
+	XLogRecPtr	result_lsn;
+	Datum		values[2];
+	bool		nulls[2];
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
 
 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 		elog(ERROR, "return type must be a row type");
 
-	check_permissions();
+	/* Acquire the source slot so we own it */
+	ReplicationSlotAcquire(NameStr(*src_name), true);
 
-	CheckSlotRequirements();
+	/* Check type of replication slot */
+	if (SlotIsLogical(MyReplicationSlot))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 (errmsg("cannot copy a logical replication slot to a physical replication slot"))));
 
-	/* acquire replication slot, this will check for conflicting names */
-	ReplicationSlotCreate(NameStr(*name), false,
-						  temporary ? RS_TEMPORARY : RS_PERSISTENT);
+	/* Copying non-reserved slot doesn't make sense */
+	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 (errmsg("cannot copy a physical replication slot that doesn't reserve WAL"))));
 
-	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-	nulls[0] = false;
+	/* Save values of the source slot */
+	restart_lsn = MyReplicationSlot->data.restart_lsn;
+	temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY);
 
+	/* Reserve WAL at creation if the source slot already reserves */
+	immediately_reserve = !XLogRecPtrIsInvalid(restart_lsn);
+
+	/* check the optional argument */
+	if (PG_NARGS() >= 3)
+		temporary = PG_GETARG_BOOL(2);
+
+	/*
+	 * To prevent the restart_lsn WAL of the source slot from removal
+	 * during copying a new slot, we copy it while holding the source slot.
+	 * Since we are not allowed to create a new one while holding another
+	 * one, we temporarily save the acquired slot and restore it after
+	 * creation. Set callback function to ensure we release replication
+	 * slots if fail below.
+	 */
 	if (immediately_reserve)
+		saveslot = MyReplicationSlot;
+	else
+		ReplicationSlotRelease();
+
+	PG_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot));
 	{
-		/* Reserve WAL as the user asked for it */
-		ReplicationSlotReserveWal();
+		if (immediately_reserve)
+			MyReplicationSlot = NULL;
 
-		/* Write this slot to disk */
-		ReplicationSlotMarkDirty();
-		ReplicationSlotSave();
+		result_lsn = create_physical_replication_slot(NameStr(*dst_name),
+													  immediately_reserve,
+													  temporary,
+													  restart_lsn);
+		Assert(MyReplicationSlot == NULL);
 
-		values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
-		nulls[1] = false;
+		/*
+		 * Restore source slot, if saved. We must not change the saveslot
+		 * to cancel the callback function.
+		 */
+		if (saveslot)
+			MyReplicationSlot = saveslot;
 	}
+	PG_END_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot));
+
+	/* Release the source slot, if not yet */
+	if (immediately_reserve)
+		ReplicationSlotRelease();
+
+	values[0] = NameGetDatum(dst_name);
+	nulls[0] = false;
+
+	if (XLogRecPtrIsInvalid(result_lsn))
+		nulls[1] = true;
 	else
 	{
-		nulls[1] = true;
+		values[1] = LSNGetDatum(result_lsn);
+		nulls[1] = false;
 	}
 
 	tuple = heap_form_tuple(tupdesc, values, nulls);
-	result = HeapTupleGetDatum(tuple);
 
-	ReplicationSlotRelease();
-
-	PG_RETURN_DATUM(result);
+	PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
 }
 
-
 /*
- * SQL function for creating a new logical replication slot.
+ * Helper function for creating a new logical replication slot with
+ * given arguments. Return a confirmed_lsn of new replication slot.
  */
-Datum
-pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+static XLogRecPtr
+create_logical_replication_slot(char *name, char *plugin,
+								bool temporary, XLogRecPtr start_lsn)
 {
-	Name		name = PG_GETARG_NAME(0);
-	Name		plugin = PG_GETARG_NAME(1);
-	bool		temporary = PG_GETARG_BOOL(2);
-
 	LogicalDecodingContext *ctx = NULL;
-
-	TupleDesc	tupdesc;
-	HeapTuple	tuple;
-	Datum		result;
-	Datum		values[2];
-	bool		nulls[2];
+	XLogRecPtr	result;
 
 	Assert(!MyReplicationSlot);
 
-	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-		elog(ERROR, "return type must be a row type");
-
 	check_permissions();
 
 	CheckLogicalDecodingRequirements();
@@ -128,39 +277,174 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 * slots can be created as temporary from beginning as they get dropped on
 	 * error as well.
 	 */
-	ReplicationSlotCreate(NameStr(*name), true,
+	ReplicationSlotCreate(name, true,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
 	/*
 	 * Create logical decoding context, to build the initial snapshot.
 	 */
-	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+	ctx = CreateInitDecodingContext(plugin, NIL,
 									false,	/* do not build snapshot */
+									start_lsn,
 									logical_read_local_xlog_page, NULL, NULL,
 									NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
 
-	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
-	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
-
 	/* don't need the decoding context anymore */
 	FreeDecodingContext(ctx);
 
-	memset(nulls, 0, sizeof(nulls));
-
-	tuple = heap_form_tuple(tupdesc, values, nulls);
-	result = HeapTupleGetDatum(tuple);
-
 	/* ok, slot is now fully created, mark it as persistent if needed */
 	if (!temporary)
 		ReplicationSlotPersist();
+
+	result = MyReplicationSlot->data.confirmed_flush;
+
 	ReplicationSlotRelease();
 
-	PG_RETURN_DATUM(result);
+	return result;
 }
 
+/*
+ * SQL function for creating a new logical replication slot.
+ */
+Datum
+pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		name = PG_GETARG_NAME(0);
+	Name		plugin = PG_GETARG_NAME(1);
+	bool		temporary = PG_GETARG_BOOL(2);
+	XLogRecPtr	confirmed_flush;
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		values[2];
+	bool		nulls[2];
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	confirmed_flush = create_logical_replication_slot(NameStr(*name),
+													  NameStr(*plugin),
+													  temporary,
+													  InvalidXLogRecPtr);
+
+	memset(nulls, 0, sizeof(nulls));
+
+	values[0] = NameGetDatum(name);
+	values[1] = LSNGetDatum(confirmed_flush);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_POINTER(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * Copy logical replication slot (2 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_logical_replication_slot_no_plugin_temp(PG_FUNCTION_ARGS)
+{
+	return pg_copy_logical_replication_slot(fcinfo);
+}
+
+/*
+ * Copy logical replication slot (3 arguments)
+ *
+ * note: this wrapper is necessary to pass the sanity check in opr_sanity,
+ * which checks that all built-in functions that share the implementing C
+ * function take the same number of arguments
+ */
+Datum
+pg_copy_logical_replication_slot_no_plugin(PG_FUNCTION_ARGS)
+{
+	return pg_copy_logical_replication_slot(fcinfo);
+}
+
+/*
+ * SQL function for copying a logical replication slot.
+ */
+Datum
+pg_copy_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		src_name = PG_GETARG_NAME(0);
+	Name		dst_name = PG_GETARG_NAME(1);
+	char		*plugin;	/* optional argument */
+	bool		temporary;	/* optional argument */
+	ReplicationSlot *saveslot = NULL;
+	XLogRecPtr	confirmed_flush;
+	XLogRecPtr	restart_lsn;
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		values[2];
+	bool		nulls[2];
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	/* Acquire the source slot so we own it */
+	ReplicationSlotAcquire(NameStr(*src_name), true);
+
+	/* Check type of replication slot */
+	if (SlotIsPhysical(MyReplicationSlot))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 (errmsg("cannot copy a physical replication slot to a logical replication slot"))));
+
+	/* Save values of the source slot */
+	restart_lsn = MyReplicationSlot->data.restart_lsn;
+	plugin = pstrdup(NameStr(MyReplicationSlot->data.plugin));
+	temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY);
+
+	/* Check the optional arguments */
+	if (PG_NARGS() >= 3)
+		plugin = NameStr(*(PG_GETARG_NAME(2)));
+	if (PG_NARGS() >= 4)
+		temporary = PG_GETARG_BOOL(3);
+
+	/*
+	 * To prevent the restart_lsn WAL of the source slot from removal
+	 * during copying a new slot, we copy it while holding the source slot.
+	 * Since we are not allowed to create a new one while holding another
+	 * one, we temporarily save the acquired slot and restore it after
+	 * creation. Set callback function to ensure we release replication
+	 * slots if fail below.
+	 */
+	saveslot = MyReplicationSlot;
+	PG_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot));
+	{
+		MyReplicationSlot = NULL;
+
+		confirmed_flush = create_logical_replication_slot(NameStr(*dst_name),
+														  plugin,
+														  temporary,
+														  restart_lsn);
+		Assert(MyReplicationSlot == NULL);
+
+		/*
+		 * Restore source slot. We must not change the saveslot to cancel the
+		 * callback function.
+		 */
+		MyReplicationSlot = saveslot;
+	}
+	PG_END_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot));
+
+	/* Release the source slot */
+	ReplicationSlotRelease();
+
+	memset(nulls, 0, sizeof(nulls));
+
+	values[0] = NameGetDatum(dst_name);
+	values[1] = LSNGetDatum(confirmed_flush);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_POINTER(HeapTupleGetDatum(tuple));
+}
 
 /*
  * SQL function for dropping a replication slot.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 46edb52..a9e1656 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -930,6 +930,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		}
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+										InvalidXLogRecPtr,
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData,
 										WalSndUpdateProgress);
@@ -972,7 +973,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	}
 	else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
 	{
-		ReplicationSlotReserveWal();
+		ReplicationSlotReserveWal(InvalidXLogRecPtr);
 
 		ReplicationSlotMarkDirty();
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 034a41e..7240991 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9603,6 +9603,20 @@
   proargmodes => '{i,i,i,o,o}',
   proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}',
   prosrc => 'pg_create_physical_replication_slot' },
+{ oid => '4008', descr => 'copy a physical replication slot',
+  proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
+  proallargtypes => '{name,name,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}',
+  prosrc => 'pg_copy_physical_replication_slot' },
+{ oid => '4009', descr => 'copy a physical replication slot',
+  proname => 'pg_copy_physical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+  proallargtypes => '{name,name,name,pg_lsn}',
+  proargmodes => '{i,i,o,o}',
+  proargnames => '{slot_name,dst_name,slot_name,lsn}',
+  prosrc => 'pg_copy_physical_replication_slot_no_temp' },
 { oid => '3780', descr => 'drop a replication slot',
   proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u',
   prorettype => 'void', proargtypes => 'name',
@@ -9623,6 +9637,27 @@
   proargmodes => '{i,i,i,o,o}',
   proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
+{ oid => '4005', descr => 'copy a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name name bool',
+  proallargtypes => '{name,name,name,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,plugin,temporary,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot' },
+{ oid => '4006', descr => 'copy a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name name',
+  proallargtypes => '{name,name,name,name,pg_lsn}',
+  proargmodes => '{i,i,i,o,o}',
+  proargnames => '{src_slot_name,plugin,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_no_plugin' },
+{ oid => '4007', descr => 'copy a logical replication slot',
+  proname => 'pg_copy_logical_replication_slot', provolatile => 'v',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name',
+  proallargtypes => '{name,name,name,pg_lsn}',
+  proargmodes => '{i,i,o,o}',
+  proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}',
+  prosrc => 'pg_copy_logical_replication_slot_no_plugin_temp' },
 { oid => '3782', descr => 'get changes from replication slot',
   proname => 'pg_logical_slot_get_changes', procost => '1000',
   prorows => '1000', provariadic => 'text', proisstrict => 'f',
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c25ac1f..afc32ff 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void);
 extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
+						  XLogRecPtr restart_lsn,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7964ae2..d5c8953 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -193,7 +193,7 @@ extern void ReplicationSlotMarkDirty(void);
 
 /* misc stuff */
 extern bool ReplicationSlotValidateName(const char *name, int elevel);
-extern void ReplicationSlotReserveWal(void);
+extern void ReplicationSlotReserveWal(XLogRecPtr requested_lsn);
 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
-- 
2.10.5

