On Sat, Sep 20, 2014 at 7:09 AM, Amit Kapila <amit.kapil...@gmail.com> wrote:
> I have looked into refactoring related patch and would like
> to share my observations with you:
Thanks! Useful input is always welcome.

> 1.
> + * Run IDENTIFY_SYSTEM through a given connection and give back to caller
> This API also gets plugin if requested, so I think it is better
> to mention the same in function header as well.
True.

> 2.
> + /* Get LSN start position if necessary */
> Isn't it better if we try to get the LSN position only incase
> startpos!=NULL (as BaseBackup don't need this)
OK. Let's do that for consistency with the other fields.

> 3.
> I find existing comments okay, is there a need to change
> in this case?  Part of the new comment mentions
> "obtaining start LSN position", actually the start position is
> identified as part of next function call FindStreamingStart(),
> only incase it return InvalidXLogRecPtr, the value returned
> by IDENTIFY_SYSTEM will be used.
Hopefully I am following you correctly here: comment has been updated
to mention that the start LSN and TLI fetched from IDENTIFY_SYSTEM are
always fetched but used only if no valid position is found in output
folder of pg_receivexlog.

> 4.
> + /* Obtain a connection before doing anything */
> + conn = GetConnection();
> + if (!conn)
> + /* Error message already written in GetConnection() */
> Is there a reason for moving this function out of StreamLog(),
> there is no harm in moving it, but there doesn't seem to be any
> problem even if it is called inside StreamLog().
For pg_recvlogical, this move is done to reduce code redundancy, and
actually it makes sense to just grab one connection in the main() code
path before performing any replication commands. For pg_receivexlog,
the move is done because it makes the code more consistent with
pg_recvlogical, and also it is a preparatory work for the second patch
that introduces the create/drop slot. Even if 2nd patch is not
accepted I figured out that it would not hurt to simply grab one
connection in the main code path before doing any action.

> 5.
> Shouldn't there be Assert instead of if (conn == NULL), as
> RunIdentifySystem() is not expected to be called without connection.
Fine for me. That's indeed more consistent with the rest this way.

> 6.
> + /* Identify system, obtaining start LSN position at the same time */
> + if (!RunIdentifySystem(conn,
> NULL, NULL, &startpos, &plugin_name))
> + disconnect_and_exit(1);
> a.
> Generally IdentifySystem is called as first API, but I think you
> have changed its location after CreateReplicationSlot as you want
> to double check the value of plugin, not sure if that is necessary or
> important enough that we start calling it later.
Funny part here is that even the current code on master and
REL9_4_STABLE of pg_recvlogical.c is fetching a start position when
creating a replication slot that is not used as two different actions
cannot be used at the same time. That's a matter of removing this line
in code though:
startpos = ((uint64) hi) << 32 | lo;
As that's only cosmetic for 9.4, and this patch makes things more
correct I guess that's fine to do nothing and just try to get this
patch in.

> b.
> Above call is trying to get startpos, won't it overwrite the value
> passed by user (case 'I':) for startpos?
Yep, that's a bug. The value that user could give would have been
overwritten all the time. Current code does not even care about
checking startpos in IDENTIFY_SYSTEM so we're fine by just removing
this part.

Updated patches are attached.
-- 
Michael
From a7743619dc93870c815920872c3b5a6971c25714 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Mon, 1 Sep 2014 20:48:43 +0900
Subject: [PATCH 1/2] Refactoring of pg_basebackup utilities

Code duplication is reduced with the introduction of new APIs for each
individual replication command:
- IDENTIFY_SYSTEM
- CREATE_REPLICATION_SLOT
- DROP_REPLICATION_SLOT
A couple of variables used to identify a timeline ID are changed as well
to be more consistent with core code.
---
 src/bin/pg_basebackup/pg_basebackup.c  |  21 +----
 src/bin/pg_basebackup/pg_receivexlog.c |  52 +++--------
 src/bin/pg_basebackup/pg_recvlogical.c | 132 +++++++---------------------
 src/bin/pg_basebackup/streamutil.c     | 155 +++++++++++++++++++++++++++++++++
 src/bin/pg_basebackup/streamutil.h     |  10 +++
 5 files changed, 211 insertions(+), 159 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 8b9acea..0ebda9a 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1569,8 +1569,8 @@ BaseBackup(void)
 {
 	PGresult   *res;
 	char	   *sysidentifier;
-	uint32		latesttli;
-	uint32		starttli;
+	TimeLineID	latesttli;
+	TimeLineID	starttli;
 	char	   *basebkp;
 	char		escaped_label[MAXPGPATH];
 	char	   *maxrate_clause = NULL;
@@ -1624,23 +1624,8 @@ BaseBackup(void)
 	/*
 	 * Run IDENTIFY_SYSTEM so we can get the timeline
 	 */
-	res = PQexec(conn, "IDENTIFY_SYSTEM");
-	if (PQresultStatus(res) != PGRES_TUPLES_OK)
-	{
-		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+	if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
 		disconnect_and_exit(1);
-	}
-	if (PQntuples(res) != 1 || PQnfields(res) < 3)
-	{
-		fprintf(stderr,
-				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
-		disconnect_and_exit(1);
-	}
-	sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
-	latesttli = atoi(PQgetvalue(res, 0, 1));
-	PQclear(res);
 
 	/*
 	 * Start the actual backup
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index a8b9ad3..b779356 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -253,21 +253,8 @@ FindStreamingStart(uint32 *tli)
 static void
 StreamLog(void)
 {
-	PGresult   *res;
-	XLogRecPtr	startpos;
-	uint32		starttli;
-	XLogRecPtr	serverpos;
-	uint32		servertli;
-	uint32		hi,
-				lo;
-
-	/*
-	 * Connect in replication mode to the server
-	 */
-	conn = GetConnection();
-	if (!conn)
-		/* Error message already written in GetConnection() */
-		return;
+	XLogRecPtr	startpos, serverpos;
+	TimeLineID	starttli, servertli;
 
 	if (!CheckServerVersionForStreaming(conn))
 	{
@@ -280,33 +267,12 @@ StreamLog(void)
 	}
 
 	/*
-	 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
-	 * position.
+	 * Identify server, obtaining start LSN position and current timeline ID
+	 * at the same time, necessary if not valid data can be found in the
+	 * existing output directory.
 	 */
-	res = PQexec(conn, "IDENTIFY_SYSTEM");
-	if (PQresultStatus(res) != PGRES_TUPLES_OK)
-	{
-		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
-		disconnect_and_exit(1);
-	}
-	if (PQntuples(res) != 1 || PQnfields(res) < 3)
-	{
-		fprintf(stderr,
-				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+	if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))
 		disconnect_and_exit(1);
-	}
-	servertli = atoi(PQgetvalue(res, 0, 1));
-	if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
-	{
-		fprintf(stderr,
-				_("%s: could not parse transaction log location \"%s\"\n"),
-				progname, PQgetvalue(res, 0, 2));
-		disconnect_and_exit(1);
-	}
-	serverpos = ((uint64) hi) << 32 | lo;
-	PQclear(res);
 
 	/*
 	 * Figure out where to start streaming.
@@ -492,6 +458,12 @@ main(int argc, char **argv)
 	pqsignal(SIGINT, sigint_handler);
 #endif
 
+	/* Obtain a connection before doing anything */
+	conn = GetConnection();
+	if (!conn)
+		/* Error message already written in GetConnection() */
+		exit(1);
+
 	while (true)
 	{
 		StreamLog();
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index f3b0f34..5272d63 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -208,15 +208,6 @@ StreamLog(void)
 	query = createPQExpBuffer();
 
 	/*
-	 * Connect in replication mode to the server
-	 */
-	if (!conn)
-		conn = GetConnection();
-	if (!conn)
-		/* Error message already written in GetConnection() */
-		return;
-
-	/*
 	 * Start the replication
 	 */
 	if (verbose)
@@ -596,7 +587,6 @@ sighup_handler(int signum)
 int
 main(int argc, char **argv)
 {
-	PGresult   *res;
 	static struct option long_options[] = {
 /* general options */
 		{"file", required_argument, NULL, 'f'},
@@ -628,6 +618,7 @@ main(int argc, char **argv)
 	int			option_index;
 	uint32		hi,
 				lo;
+	char	   *plugin_name;
 
 	progname = get_progname(argv[0]);
 	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
@@ -834,121 +825,60 @@ main(int argc, char **argv)
 #endif
 
 	/*
-	 * don't really need this but it actually helps to get more precise error
-	 * messages about authentication, required GUCs and such without starting
-	 * to loop around connection attempts lateron.
+	 * Obtain a connection to server. This is not really necessary but it
+	 * helps to get more precise error messages about authentification,
+	 * required GUC parameters and such.
 	 */
-	{
-		conn = GetConnection();
-		if (!conn)
-			/* Error message already written in GetConnection() */
-			exit(1);
-
-		/*
-		 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
-		 * position.
-		 */
-		res = PQexec(conn, "IDENTIFY_SYSTEM");
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
-		{
-			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-					progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
-			disconnect_and_exit(1);
-		}
-
-		if (PQntuples(res) != 1 || PQnfields(res) < 4)
-		{
-			fprintf(stderr,
-					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-					progname, PQntuples(res), PQnfields(res), 1, 4);
-			disconnect_and_exit(1);
-		}
-		PQclear(res);
-	}
-
+	conn = GetConnection();
+	if (!conn)
+		/* Error message already written in GetConnection() */
+		exit(1);
 
-	/*
-	 * stop a replication slot
-	 */
+	/* Drop a replication slot */
 	if (do_drop_slot)
 	{
-		char		query[256];
-
 		if (verbose)
 			fprintf(stderr,
-					_("%s: freeing replication slot \"%s\"\n"),
+					_("%s: dropping replication slot \"%s\"\n"),
 					progname, replication_slot);
 
-		snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
-				 replication_slot);
-		res = PQexec(conn, query);
-		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		{
-			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-					progname, query, PQerrorMessage(conn));
-			disconnect_and_exit(1);
-		}
-
-		if (PQntuples(res) != 0 || PQnfields(res) != 0)
-		{
-			fprintf(stderr,
-					_("%s: could not stop logical replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, PQntuples(res), PQnfields(res), 0, 0);
+		if (!DropReplicationSlot(conn, false))
 			disconnect_and_exit(1);
-		}
-
-		PQclear(res);
 		disconnect_and_exit(0);
 	}
 
-	/*
-	 * init a replication slot
-	 */
+	/* Create a replication slot */
 	if (do_create_slot)
 	{
-		char		query[256];
-
 		if (verbose)
 			fprintf(stderr,
-					_("%s: initializing replication slot \"%s\"\n"),
+					_("%s: creating replication slot \"%s\"\n"),
 					progname, replication_slot);
 
-		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
-				 replication_slot, plugin);
-
-		res = PQexec(conn, query);
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
-		{
-			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-					progname, query, PQerrorMessage(conn));
-			disconnect_and_exit(1);
-		}
-
-		if (PQntuples(res) != 1 || PQnfields(res) != 4)
-		{
-			fprintf(stderr,
-					_("%s: could not init logical replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, PQntuples(res), PQnfields(res), 1, 4);
-			disconnect_and_exit(1);
-		}
-
-		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
-		{
-			fprintf(stderr,
-					_("%s: could not parse transaction log location \"%s\"\n"),
-					progname, PQgetvalue(res, 0, 1));
+		if (!CreateReplicationSlot(conn, plugin, false))
 			disconnect_and_exit(1);
-		}
-		startpos = ((uint64) hi) << 32 | lo;
-
-		replication_slot = strdup(PQgetvalue(res, 0, 0));
-		PQclear(res);
 	}
 
-
 	if (!do_start_slot)
 		disconnect_and_exit(0);
 
+	/* Identify system, obtaining start LSN position at the same time */
+	if (!RunIdentifySystem(conn, NULL, NULL, NULL, &plugin_name))
+		disconnect_and_exit(1);
+
+	/*
+	 * Check that there is a plugin name, a logical slot needs to have
+	 * one absolutely.
+	 */
+	if (!plugin_name)
+	{
+		fprintf(stderr,
+				_("%s: no plugin found for replication slot \"%s\"\n"),
+				progname, replication_slot);
+		disconnect_and_exit(1);
+	}
+
+	/* Stream loop */
 	while (true)
 	{
 		StreamLog();
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 1100260..de8d42c 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -227,6 +227,161 @@ GetConnection(void)
 	return tmpconn;
 }
 
+/*
+ * Run IDENTIFY_SYSTEM through a given connection and give back to caller
+ * some result information if requested:
+ * - Start LSN position
+ * - Current timeline ID
+ * - System identifier
+ * - Plugin name
+ */
+bool
+RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
+				  XLogRecPtr *startpos, char **plugin)
+{
+	PGresult   *res;
+	uint32		hi, lo;
+
+	/* Check connection existence */
+	Assert(conn != NULL);
+
+	res = PQexec(conn, "IDENTIFY_SYSTEM");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+		return false;
+	}
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
+	{
+		fprintf(stderr,
+				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+				progname, PQntuples(res), PQnfields(res), 1, 3);
+		return false;
+	}
+
+	/* Get system identifier */
+	if (sysid != NULL)
+		*sysid = pg_strdup(PQgetvalue(res, 0, 0));
+
+	/* Get timeline ID to start streaming from */
+	if (starttli != NULL)
+		*starttli = atoi(PQgetvalue(res, 0, 1));
+
+	/* Get LSN start position if necessary */
+	if (startpos != NULL)
+	{
+		if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
+		{
+			fprintf(stderr,
+					_("%s: could not parse transaction log location \"%s\"\n"),
+					progname, PQgetvalue(res, 0, 2));
+			return false;
+		}
+		*startpos = ((uint64) hi) << 32 | lo;
+	}
+
+	/* Get decoder plugin name, only available in 9.4 and newer versions */
+	if  (plugin != NULL)
+		*plugin = PQnfields(res) > 3 && !PQgetisnull(res, 0, 3) ?
+			pg_strdup(PQgetvalue(res, 0, 3)) : (char *) NULL;
+
+	PQclear(res);
+	return true;
+}
+
+/*
+ * Create a replication slot for the given connection. This function
+ * returns true in case of success as well as the start position
+ * obtained after the slot creation.
+ */
+bool
+CreateReplicationSlot(PGconn *conn, const char *plugin,
+					  bool is_physical)
+{
+	char		query[256];
+	PGresult   *res;
+	uint32		hi, lo;
+
+	Assert((is_physical && plugin == NULL) ||
+		   (!is_physical && plugin != NULL));
+	Assert(replication_slot != NULL);
+
+	/* Build query */
+	if (is_physical)
+		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
+				 replication_slot);
+	else
+		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+				 replication_slot, plugin);
+
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+				progname, query, PQerrorMessage(conn));
+		return false;
+	}
+
+	if (PQntuples(res) != 1 || PQnfields(res) != 4)
+	{
+		fprintf(stderr,
+				_("%s: could not init %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
+				progname, is_physical ? "physical" : "logical",
+				PQntuples(res), PQnfields(res), 1, 4);
+		return false;
+	}
+
+	/* Check LSN format obtained as consistent point */
+	if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+	{
+		fprintf(stderr,
+				_("%s: could not parse transaction log location \"%s\"\n"),
+				progname, PQgetvalue(res, 0, 1));
+		return false;
+	}
+
+	replication_slot = strdup(PQgetvalue(res, 0, 0));
+	PQclear(res);
+	return true;
+}
+
+/*
+ * Drop a replication slot for the given connection. This function
+ * returns true in case of success.
+ */
+bool
+DropReplicationSlot(PGconn *conn, bool is_physical)
+{
+	char        query[256];
+	PGresult   *res;
+
+	Assert(replication_slot != NULL);
+
+	/* Build query */
+	snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
+			 replication_slot);
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+				progname, query, PQerrorMessage(conn));
+		return false;
+	}
+
+	if (PQntuples(res) != 0 || PQnfields(res) != 0)
+	{
+		fprintf(stderr,
+				_("%s: could not stop %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
+				progname, is_physical ? "physical" : "logical",
+				PQntuples(res), PQnfields(res), 0, 0);
+		return false;
+	}
+
+	PQclear(res);
+	return true;
+}
+
 
 /*
  * Frontend version of GetCurrentTimestamp(), since we are not linked with
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 8c6691f..a4a1f77 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -14,6 +14,8 @@
 
 #include "libpq-fe.h"
 
+#include "access/xlogdefs.h"
+
 extern const char *progname;
 extern char *connection_string;
 extern char *dbhost;
@@ -28,6 +30,14 @@ extern PGconn *conn;
 
 extern PGconn *GetConnection(void);
 
+/* Replication commands */
+extern bool CreateReplicationSlot(PGconn *conn, const char *plugin,
+								  bool is_physical);
+extern bool DropReplicationSlot(PGconn *conn, bool is_physical);
+extern bool RunIdentifySystem(PGconn *conn, char **sysid,
+							  TimeLineID *starttli,
+							  XLogRecPtr *startpos,
+							  char **plugin);
 extern int64 feGetCurrentTimestamp(void);
 extern void feTimestampDifference(int64 start_time, int64 stop_time,
 					  long *secs, int *microsecs);
-- 
2.1.0

From 262e02643f050ec57fe86f631b9b99dfd1454a1e Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Mon, 1 Sep 2014 20:53:45 +0900
Subject: [PATCH 2/2] Support for replslot creation and drop in pg_receivexlog

Using the new actions --create and --drop that are similarly present
in pg_recvlogical, a user can respectively create and drop a replication
slot that can be used afterwards when fetching WALs.
---
 doc/src/sgml/ref/pg_receivexlog.sgml   | 29 +++++++++++++++++
 src/bin/pg_basebackup/pg_receivexlog.c | 59 +++++++++++++++++++++++++++++++++-
 2 files changed, 87 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index 5916b8f..51d93ea 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -72,6 +72,35 @@ PostgreSQL documentation
   <title>Options</title>
 
    <para>
+    <application>pg_receivexlog</application> can run in one of two following
+    modes, which control physical replication slot:
+
+    <variablelist>
+
+     <varlistentry>
+      <term><option>--create</option></term>
+      <listitem>
+       <para>
+        Create a new physical replication slot with the name specified in
+        <option>--slot</option>, then exit.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>--drop</option></term>
+      <listitem>
+       <para>
+        Drop the replication slot with the name specified in
+        <option>--slot</option>, then exit.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+
+   </para>
+
+   <para>
     The following command-line options control the location and format of the
     output.
 
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index b779356..b5ee820 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -38,6 +38,8 @@ static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static int	fsync_interval = 0; /* 0 = default */
 static volatile bool time_to_abort = false;
+static bool do_create_slot = false;
+static bool do_drop_slot = false;
 
 
 static void usage(void);
@@ -78,6 +80,9 @@ usage(void)
 	printf(_("  -w, --no-password      never prompt for password\n"));
 	printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
 	printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
+	printf(_("\nOptional actions:\n"));
+	printf(_("      --create           create a new replication slot (for the slot's name see --slot)\n"));
+	printf(_("      --drop             drop the replication slot (for the slot's name see --slot)\n"));
 	printf(_("\nReport bugs to <pgsql-b...@postgresql.org>.\n"));
 }
 
@@ -336,6 +341,9 @@ main(int argc, char **argv)
 		{"status-interval", required_argument, NULL, 's'},
 		{"slot", required_argument, NULL, 'S'},
 		{"verbose", no_argument, NULL, 'v'},
+/* action */
+		{"create", no_argument, NULL, 1},
+		{"drop", no_argument, NULL, 2},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -419,6 +427,13 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+/* action */
+			case 1:
+				do_create_slot = true;
+				break;
+			case 2:
+				do_drop_slot = true;
+				break;
 			default:
 
 				/*
@@ -443,10 +458,26 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (replication_slot == NULL && (do_drop_slot || do_create_slot))
+	{
+		fprintf(stderr, _("%s: replication slot needed with action --create or --drop\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (do_drop_slot && do_create_slot)
+	{
+		fprintf(stderr, _("%s: cannot use --create together with --drop\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 	/*
 	 * Required arguments
 	 */
-	if (basedir == NULL)
+	if (basedir == NULL && !do_create_slot && !do_drop_slot)
 	{
 		fprintf(stderr, _("%s: no target directory specified\n"), progname);
 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
@@ -464,6 +495,32 @@ main(int argc, char **argv)
 		/* Error message already written in GetConnection() */
 		exit(1);
 
+	/* Drop a replication slot */
+	if (do_drop_slot)
+	{
+		if (verbose)
+			fprintf(stderr,
+					_("%s: dropping replication slot \"%s\"\n"),
+					progname, replication_slot);
+
+		if (!DropReplicationSlot(conn, true))
+			disconnect_and_exit(1);
+		disconnect_and_exit(0);
+	}
+
+	/* Create a replication slot */
+	if (do_create_slot)
+	{
+		if (verbose)
+			fprintf(stderr,
+					_("%s: creating replication slot \"%s\"\n"),
+					progname, replication_slot);
+
+		if (!CreateReplicationSlot(conn, NULL, true))
+			disconnect_and_exit(1);
+		disconnect_and_exit(0);
+	}
+
 	while (true)
 	{
 		StreamLog();
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to