Hi,

I've split and edited patch 0001:
0001) Old WIP patch for pg_recvlogical tests. Useful for easier development.
0002) Copy editing that should be in 9.4
0003) Rebased patches of yours
0004) My changes to 0003 besides the rebase. This'll be squashed, but I
      thought it might be interesting for you.

I haven't tested my edits besides running 0001...

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 45582de61f670967f72835cd14d1c520c3d04fce Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 1 Oct 2014 12:57:15 +0200
Subject: [PATCH 1/4] WIP: pg_recvlogical tests

---
 src/bin/pg_basebackup/Makefile                |  3 ++
 src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 45 +++++++++++++++++++++++++++
 2 files changed, 48 insertions(+)
 create mode 100644 src/bin/pg_basebackup/t/030_pg_recvlogical.pl

diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index 90d19e7..3c0a14e 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -51,7 +51,10 @@ clean distclean maintainer-clean:
 	rm -rf tmp_check
 
 check: all
+# Needs test_decoding to run the pg_recvlogical tests
+	$(MAKE) -C $(top_builddir)/contrib/test_decoding DESTDIR='$(CURDIR)'/tmp_check/install install >>'$(CURDIR)'/tmp_check/log/install.log 2>&1
 	$(prove_check)
 
 installcheck:
+# XXX: We rely on test_decoding already being installed here
 	$(prove_installcheck)
diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
new file mode 100644
index 0000000..1741319
--- /dev/null
+++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
@@ -0,0 +1,45 @@
+use strict;
+use warnings;
+use TestLib;
+use Test::More tests => 9;
+
+program_help_ok('pg_recvlogical');
+program_version_ok('pg_recvlogical');
+program_options_handling_ok('pg_recvlogical');
+
+
+my $tempdir = tempdir;
+start_test_server $tempdir;
+
+open HBA, ">>$tempdir/pgdata/pg_hba.conf";
+print HBA "local replication all trust\n";
+close HBA;
+system_or_bail 'pg_ctl', '-s', '-D', "$tempdir/pgdata", 'reload';
+
+open HBA, ">>$tempdir/pgdata/pg_hba.conf";
+print HBA "local replication all trust\n";
+close HBA;
+
+open CONF, ">>$tempdir/pgdata/postgresql.conf";
+print CONF "max_wal_senders = 1\n";
+print CONF "max_replication_slots = 2\n";
+print CONF "wal_level = logical\n";
+close CONF;
+
+restart_test_server;
+
+command_fails([ 'pg_recvlogical', '-d', 'postgres', '--slot', 'regress', '--plugin', 'barf', '--create'],
+	   'pg recvlogical cannot create slot with nonexistant plugin');
+command_ok([ 'pg_recvlogical', '-d', 'postgres', '--slot', 'regress', '--create'],
+	   'pg recvlogical can create a slot');
+command_fails([ 'pg_recvlogical', '-d', 'postgres', '--slot', 'regress', '--create'],
+	   'pg recvlogical cannot create slot with a already used name');
+command_fails([ 'pg_recvlogical', '-d', 'postgres', '--slot', 'regress', '--create'],
+	   'pg recvlogical cannot create slot when all slots are in use');
+command_ok([ 'pg_recvlogical', '-d', 'postgres', '--slot', 'regress', '--drop'],
+	   'pg recvlogical can drop slot');
+command_fails([ 'pg_recvlogical', '-d', 'postgres', '--slot', 'regress', '--start'],
+	   'pg recvlogical cannot stream from nonexistant slot');
+
+# cannot easily test streaming actual changes because that goes on
+# forever.
-- 
1.8.3.251.g1462b67

>From b9f73cef1a9dc1aa96ad1807893e78c924f690f6 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 29 Sep 2014 15:35:40 +0200
Subject: [PATCH 2/4] Message and comment policing of pg_recvlogical.c.

Several comments still referred to 'initiating', 'freeing', 'stopping'
replication slots. These were terms used during different phases of
the development of logical decoding, but are no long accurate.

Author: Michael Paquier

Backpatch to 9.4 where pg_recvlogical was introduced.
---
 src/bin/pg_basebackup/pg_recvlogical.c | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index f3b0f34..4144688 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -868,7 +868,7 @@ main(int argc, char **argv)
 
 
 	/*
-	 * stop a replication slot
+	 * drop a replication slot
 	 */
 	if (do_drop_slot)
 	{
@@ -876,7 +876,7 @@ main(int argc, char **argv)
 
 		if (verbose)
 			fprintf(stderr,
-					_("%s: freeing replication slot \"%s\"\n"),
+					_("%s: dropping replication slot \"%s\"\n"),
 					progname, replication_slot);
 
 		snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
@@ -892,8 +892,8 @@ main(int argc, char **argv)
 		if (PQntuples(res) != 0 || PQnfields(res) != 0)
 		{
 			fprintf(stderr,
-					_("%s: could not stop logical replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, PQntuples(res), PQnfields(res), 0, 0);
+					_("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+					progname, replication_slot, PQntuples(res), PQnfields(res), 0, 0);
 			disconnect_and_exit(1);
 		}
 
@@ -902,7 +902,7 @@ main(int argc, char **argv)
 	}
 
 	/*
-	 * init a replication slot
+	 * create a replication slot
 	 */
 	if (do_create_slot)
 	{
@@ -910,7 +910,7 @@ main(int argc, char **argv)
 
 		if (verbose)
 			fprintf(stderr,
-					_("%s: initializing replication slot \"%s\"\n"),
+					_("%s: creating replication slot \"%s\"\n"),
 					progname, replication_slot);
 
 		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
@@ -927,8 +927,8 @@ main(int argc, char **argv)
 		if (PQntuples(res) != 1 || PQnfields(res) != 4)
 		{
 			fprintf(stderr,
-					_("%s: could not init logical replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, PQntuples(res), PQnfields(res), 1, 4);
+					_("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+					progname, replication_slot, PQntuples(res), PQnfields(res), 1, 4);
 			disconnect_and_exit(1);
 		}
 
-- 
1.8.3.251.g1462b67

>From d667f7a63cd62733d88ec5b7228dfd5d7136b9d7 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Mon, 1 Sep 2014 20:48:43 +0900
Subject: [PATCH 3/4] Refactoring of pg_basebackup utilities

Code duplication is reduced with the introduction of new APIs for each
individual replication command:
- IDENTIFY_SYSTEM
- CREATE_REPLICATION_SLOT
- DROP_REPLICATION_SLOT
A couple of variables used to identify a timeline ID are changed as well
to be more consistent with core code.
---
 src/bin/pg_basebackup/pg_basebackup.c  |  21 +----
 src/bin/pg_basebackup/pg_receivexlog.c |  38 ++------
 src/bin/pg_basebackup/pg_recvlogical.c | 116 ++++++------------------
 src/bin/pg_basebackup/streamutil.c     | 159 +++++++++++++++++++++++++++++++++
 src/bin/pg_basebackup/streamutil.h     |  10 +++
 5 files changed, 207 insertions(+), 137 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 8b9acea..0ebda9a 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1569,8 +1569,8 @@ BaseBackup(void)
 {
 	PGresult   *res;
 	char	   *sysidentifier;
-	uint32		latesttli;
-	uint32		starttli;
+	TimeLineID	latesttli;
+	TimeLineID	starttli;
 	char	   *basebkp;
 	char		escaped_label[MAXPGPATH];
 	char	   *maxrate_clause = NULL;
@@ -1624,23 +1624,8 @@ BaseBackup(void)
 	/*
 	 * Run IDENTIFY_SYSTEM so we can get the timeline
 	 */
-	res = PQexec(conn, "IDENTIFY_SYSTEM");
-	if (PQresultStatus(res) != PGRES_TUPLES_OK)
-	{
-		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+	if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
 		disconnect_and_exit(1);
-	}
-	if (PQntuples(res) != 1 || PQnfields(res) < 3)
-	{
-		fprintf(stderr,
-				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
-		disconnect_and_exit(1);
-	}
-	sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
-	latesttli = atoi(PQgetvalue(res, 0, 1));
-	PQclear(res);
 
 	/*
 	 * Start the actual backup
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index a8b9ad3..171cf43 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -253,13 +253,8 @@ FindStreamingStart(uint32 *tli)
 static void
 StreamLog(void)
 {
-	PGresult   *res;
-	XLogRecPtr	startpos;
-	uint32		starttli;
-	XLogRecPtr	serverpos;
-	uint32		servertli;
-	uint32		hi,
-				lo;
+	XLogRecPtr	startpos, serverpos;
+	TimeLineID	starttli, servertli;
 
 	/*
 	 * Connect in replication mode to the server
@@ -280,33 +275,12 @@ StreamLog(void)
 	}
 
 	/*
-	 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
-	 * position.
+	 * Identify server, obtaining start LSN position and current timeline ID
+	 * at the same time, necessary if not valid data can be found in the
+	 * existing output directory.
 	 */
-	res = PQexec(conn, "IDENTIFY_SYSTEM");
-	if (PQresultStatus(res) != PGRES_TUPLES_OK)
-	{
-		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
-		disconnect_and_exit(1);
-	}
-	if (PQntuples(res) != 1 || PQnfields(res) < 3)
-	{
-		fprintf(stderr,
-				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+	if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))
 		disconnect_and_exit(1);
-	}
-	servertli = atoi(PQgetvalue(res, 0, 1));
-	if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
-	{
-		fprintf(stderr,
-				_("%s: could not parse transaction log location \"%s\"\n"),
-				progname, PQgetvalue(res, 0, 2));
-		disconnect_and_exit(1);
-	}
-	serverpos = ((uint64) hi) << 32 | lo;
-	PQclear(res);
 
 	/*
 	 * Figure out where to start streaming.
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 4144688..23ae225 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -596,7 +596,6 @@ sighup_handler(int signum)
 int
 main(int argc, char **argv)
 {
-	PGresult   *res;
 	static struct option long_options[] = {
 /* general options */
 		{"file", required_argument, NULL, 'f'},
@@ -628,6 +627,7 @@ main(int argc, char **argv)
 	int			option_index;
 	uint32		hi,
 				lo;
+	char	   *db_name;
 
 	progname = get_progname(argv[0]);
 	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
@@ -834,121 +834,63 @@ main(int argc, char **argv)
 #endif
 
 	/*
-	 * don't really need this but it actually helps to get more precise error
-	 * messages about authentication, required GUCs and such without starting
-	 * to loop around connection attempts lateron.
+	 * Obtain a connection to server. This is not really necessary but it
+	 * helps to get more precise error messages about authentification,
+	 * required GUC parameters and such.
 	 */
-	{
-		conn = GetConnection();
-		if (!conn)
-			/* Error message already written in GetConnection() */
-			exit(1);
-
-		/*
-		 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
-		 * position.
-		 */
-		res = PQexec(conn, "IDENTIFY_SYSTEM");
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
-		{
-			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-					progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
-			disconnect_and_exit(1);
-		}
-
-		if (PQntuples(res) != 1 || PQnfields(res) < 4)
-		{
-			fprintf(stderr,
-					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
-					progname, PQntuples(res), PQnfields(res), 1, 4);
-			disconnect_and_exit(1);
-		}
-		PQclear(res);
-	}
+	conn = GetConnection();
+	if (!conn)
+		/* Error message already written in GetConnection() */
+		exit(1);
 
+	/*
+	 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
+	 * position.
+	 */
+	if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
+		disconnect_and_exit(1);
 
 	/*
-	 * drop a replication slot
+	 * Check that there is a database associated with connection, one is
+	 * needed in this context.
 	 */
-	if (do_drop_slot)
+	if (!db_name)
 	{
-		char		query[256];
+		fprintf(stderr,
+				_("%s: no database defined for replication connection \"%s\"\n"),
+				progname, replication_slot);
+		disconnect_and_exit(1);
+	}
 
+	/* Drop a replication slot */
+	if (do_drop_slot)
+	{
 		if (verbose)
 			fprintf(stderr,
 					_("%s: dropping replication slot \"%s\"\n"),
 					progname, replication_slot);
 
-		snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
-				 replication_slot);
-		res = PQexec(conn, query);
-		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		{
-			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-					progname, query, PQerrorMessage(conn));
+		if (!DropReplicationSlot(conn, false))
 			disconnect_and_exit(1);
-		}
-
-		if (PQntuples(res) != 0 || PQnfields(res) != 0)
-		{
-			fprintf(stderr,
-					_("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, replication_slot, PQntuples(res), PQnfields(res), 0, 0);
-			disconnect_and_exit(1);
-		}
-
-		PQclear(res);
 		disconnect_and_exit(0);
 	}
 
-	/*
-	 * create a replication slot
-	 */
+	/* Create a replication slot */
 	if (do_create_slot)
 	{
-		char		query[256];
-
 		if (verbose)
 			fprintf(stderr,
 					_("%s: creating replication slot \"%s\"\n"),
 					progname, replication_slot);
 
-		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
-				 replication_slot, plugin);
-
-		res = PQexec(conn, query);
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
-		{
-			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-					progname, query, PQerrorMessage(conn));
+		if (!CreateReplicationSlot(conn, plugin, &startpos, false))
 			disconnect_and_exit(1);
-		}
-
-		if (PQntuples(res) != 1 || PQnfields(res) != 4)
-		{
-			fprintf(stderr,
-					_("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, replication_slot, PQntuples(res), PQnfields(res), 1, 4);
-			disconnect_and_exit(1);
-		}
-
-		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
-		{
-			fprintf(stderr,
-					_("%s: could not parse transaction log location \"%s\"\n"),
-					progname, PQgetvalue(res, 0, 1));
-			disconnect_and_exit(1);
-		}
-		startpos = ((uint64) hi) << 32 | lo;
-
-		replication_slot = strdup(PQgetvalue(res, 0, 0));
-		PQclear(res);
 	}
 
-
 	if (!do_start_slot)
 		disconnect_and_exit(0);
 
+	/* Stream loop */
 	while (true)
 	{
 		StreamLog();
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 1100260..2c373b8 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -227,6 +227,165 @@ GetConnection(void)
 	return tmpconn;
 }
 
+/*
+ * Run IDENTIFY_SYSTEM through a given connection and give back to caller
+ * some result information if requested:
+ * - Start LSN position
+ * - Current timeline ID
+ * - System identifier
+ * - Plugin name
+ */
+bool
+RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
+				  XLogRecPtr *startpos, char **db_name)
+{
+	PGresult   *res;
+	uint32		hi, lo;
+
+	/* Check connection existence */
+	Assert(conn != NULL);
+
+	res = PQexec(conn, "IDENTIFY_SYSTEM");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+		return false;
+	}
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
+	{
+		fprintf(stderr,
+				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+				progname, PQntuples(res), PQnfields(res), 1, 3);
+		return false;
+	}
+
+	/* Get system identifier */
+	if (sysid != NULL)
+		*sysid = pg_strdup(PQgetvalue(res, 0, 0));
+
+	/* Get timeline ID to start streaming from */
+	if (starttli != NULL)
+		*starttli = atoi(PQgetvalue(res, 0, 1));
+
+	/* Get LSN start position if necessary */
+	if (startpos != NULL)
+	{
+		if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
+		{
+			fprintf(stderr,
+					_("%s: could not parse transaction log location \"%s\"\n"),
+					progname, PQgetvalue(res, 0, 2));
+			return false;
+		}
+		*startpos = ((uint64) hi) << 32 | lo;
+	}
+
+	/* Get database name, only available in 9.4 and newer versions */
+	if  (db_name != NULL)
+		*db_name = PQnfields(res) > 3 && !PQgetisnull(res, 0, 3) ?
+			pg_strdup(PQgetvalue(res, 0, 3)) : (char *) NULL;
+
+	PQclear(res);
+	return true;
+}
+
+/*
+ * Create a replication slot for the given connection. This function
+ * returns true in case of success as well as the start position
+ * obtained after the slot creation.
+ */
+bool
+CreateReplicationSlot(PGconn *conn, const char *plugin,
+					  XLogRecPtr *startpos, bool is_physical)
+{
+	char		query[256];
+	PGresult   *res;
+	uint32		hi, lo;
+
+	Assert((is_physical && plugin == NULL) ||
+		   (!is_physical && plugin != NULL));
+	Assert(replication_slot != NULL);
+
+	/* Build query */
+	if (is_physical)
+		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
+				 replication_slot);
+	else
+		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+				 replication_slot, plugin);
+
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+				progname, query, PQerrorMessage(conn));
+		return false;
+	}
+
+	if (PQntuples(res) != 1 || PQnfields(res) != 4)
+	{
+		fprintf(stderr,
+				_("%s: could not init %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
+				progname, is_physical ? "physical" : "logical",
+				PQntuples(res), PQnfields(res), 1, 4);
+		return false;
+	}
+
+	/* Get LSN start position if necessary */
+	if (startpos != NULL)
+	{
+		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+		{
+			fprintf(stderr,
+					_("%s: could not parse transaction log location \"%s\"\n"),
+					progname, PQgetvalue(res, 0, 1));
+			return false;
+		}
+		*startpos = ((uint64) hi) << 32 | lo;
+	}
+
+	replication_slot = strdup(PQgetvalue(res, 0, 0));
+	PQclear(res);
+	return true;
+}
+
+/*
+ * Drop a replication slot for the given connection. This function
+ * returns true in case of success.
+ */
+bool
+DropReplicationSlot(PGconn *conn, bool is_physical)
+{
+	char        query[256];
+	PGresult   *res;
+
+	Assert(replication_slot != NULL);
+
+	/* Build query */
+	snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
+			 replication_slot);
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+				progname, query, PQerrorMessage(conn));
+		return false;
+	}
+
+	if (PQntuples(res) != 0 || PQnfields(res) != 0)
+	{
+		fprintf(stderr,
+				_("%s: could not stop %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
+				progname, is_physical ? "physical" : "logical",
+				PQntuples(res), PQnfields(res), 0, 0);
+		return false;
+	}
+
+	PQclear(res);
+	return true;
+}
+
 
 /*
  * Frontend version of GetCurrentTimestamp(), since we are not linked with
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 8c6691f..9067810 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -14,6 +14,8 @@
 
 #include "libpq-fe.h"
 
+#include "access/xlogdefs.h"
+
 extern const char *progname;
 extern char *connection_string;
 extern char *dbhost;
@@ -28,6 +30,14 @@ extern PGconn *conn;
 
 extern PGconn *GetConnection(void);
 
+/* Replication commands */
+extern bool CreateReplicationSlot(PGconn *conn, const char *plugin,
+								  XLogRecPtr *startpos, bool is_physical);
+extern bool DropReplicationSlot(PGconn *conn, bool is_physical);
+extern bool RunIdentifySystem(PGconn *conn, char **sysid,
+							  TimeLineID *starttli,
+							  XLogRecPtr *startpos,
+							  char **db_name);
 extern int64 feGetCurrentTimestamp(void);
 extern void feTimestampDifference(int64 start_time, int64 stop_time,
 					  long *secs, int *microsecs);
-- 
1.8.3.251.g1462b67

>From 9ee80028edb112eaa6c1611ac57d8b8eb72baeb8 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 1 Oct 2014 12:29:11 +0200
Subject: [PATCH 4/4] Polishing 'pg_basebackup refactoring'.

---
 src/bin/pg_basebackup/pg_recvlogical.c | 30 +++++++---------
 src/bin/pg_basebackup/streamutil.c     | 66 ++++++++++++++++++++--------------
 src/bin/pg_basebackup/streamutil.h     |  7 ++--
 3 files changed, 57 insertions(+), 46 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 23ae225..4002296 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -56,7 +56,7 @@ static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
 static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
 
 static void usage(void);
-static void StreamLog();
+static void StreamLogicalLog();
 static void disconnect_and_exit(int code);
 
 static void
@@ -194,7 +194,7 @@ OutputFsync(int64 now)
  * Start the log streaming
  */
 static void
-StreamLog(void)
+StreamLogicalLog(void)
 {
 	PGresult   *res;
 	char	   *copybuf = NULL;
@@ -844,25 +844,21 @@ main(int argc, char **argv)
 		exit(1);
 
 	/*
-	 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
-	 * position.
+	 * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
+	 * replication connection.
 	 */
 	if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
 		disconnect_and_exit(1);
 
-	/*
-	 * Check that there is a database associated with connection, one is
-	 * needed in this context.
-	 */
-	if (!db_name)
+	if (db_name == NULL)
 	{
 		fprintf(stderr,
-				_("%s: no database defined for replication connection \"%s\"\n"),
-				progname, replication_slot);
+				_("%s: failed to establish database specific replication connection\n"),
+				progname);
 		disconnect_and_exit(1);
 	}
 
-	/* Drop a replication slot */
+	/* Drop a replication slot. */
 	if (do_drop_slot)
 	{
 		if (verbose)
@@ -870,12 +866,11 @@ main(int argc, char **argv)
 					_("%s: dropping replication slot \"%s\"\n"),
 					progname, replication_slot);
 
-		if (!DropReplicationSlot(conn, false))
+		if (!DropReplicationSlot(conn, replication_slot))
 			disconnect_and_exit(1);
-		disconnect_and_exit(0);
 	}
 
-	/* Create a replication slot */
+	/* Create a replication slot. */
 	if (do_create_slot)
 	{
 		if (verbose)
@@ -883,7 +878,8 @@ main(int argc, char **argv)
 					_("%s: creating replication slot \"%s\"\n"),
 					progname, replication_slot);
 
-		if (!CreateReplicationSlot(conn, plugin, &startpos, false))
+		if (!CreateReplicationSlot(conn, replication_slot, plugin,
+								   &startpos, false))
 			disconnect_and_exit(1);
 	}
 
@@ -893,7 +889,7 @@ main(int argc, char **argv)
 	/* Stream loop */
 	while (true)
 	{
-		StreamLog();
+		StreamLogicalLog();
 		if (time_to_abort)
 		{
 			/*
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 2c373b8..2f4bac9 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -27,6 +27,7 @@
 #include "receivelog.h"
 #include "streamutil.h"
 
+#include "pqexpbuffer.h"
 #include "common/fe_memutils.h"
 #include "datatype/timestamp.h"
 
@@ -283,8 +284,17 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 
 	/* Get database name, only available in 9.4 and newer versions */
 	if  (db_name != NULL)
-		*db_name = PQnfields(res) > 3 && !PQgetisnull(res, 0, 3) ?
-			pg_strdup(PQgetvalue(res, 0, 3)) : (char *) NULL;
+	{
+		if (PQnfields(res) < 4)
+			fprintf(stderr,
+					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+					progname, PQntuples(res), PQnfields(res), 1, 4);
+
+		if (PQgetisnull(res, 0, 3))
+			*db_name =  NULL;
+		else
+			*db_name = pg_strdup(PQgetvalue(res, 0, 3));
+	}
 
 	PQclear(res);
 	return true;
@@ -296,38 +306,39 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
  * obtained after the slot creation.
  */
 bool
-CreateReplicationSlot(PGconn *conn, const char *plugin,
+CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 					  XLogRecPtr *startpos, bool is_physical)
 {
-	char		query[256];
+	PQExpBuffer query;
 	PGresult   *res;
-	uint32		hi, lo;
+
+	query = createPQExpBuffer();
 
 	Assert((is_physical && plugin == NULL) ||
 		   (!is_physical && plugin != NULL));
-	Assert(replication_slot != NULL);
+	Assert(slot_name != NULL);
 
 	/* Build query */
 	if (is_physical)
-		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
-				 replication_slot);
+		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
+						  slot_name);
 	else
-		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
-				 replication_slot, plugin);
+		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+						  slot_name, plugin);
 
-	res = PQexec(conn, query);
+	res = PQexec(conn, query->data);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-				progname, query, PQerrorMessage(conn));
+				progname, query->data, PQerrorMessage(conn));
 		return false;
 	}
 
 	if (PQntuples(res) != 1 || PQnfields(res) != 4)
 	{
 		fprintf(stderr,
-				_("%s: could not init %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, is_physical ? "physical" : "logical",
+				_("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+				progname, slot_name,
 				PQntuples(res), PQnfields(res), 1, 4);
 		return false;
 	}
@@ -335,6 +346,8 @@ CreateReplicationSlot(PGconn *conn, const char *plugin,
 	/* Get LSN start position if necessary */
 	if (startpos != NULL)
 	{
+		uint32		hi, lo;
+
 		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
 		{
 			fprintf(stderr,
@@ -345,7 +358,6 @@ CreateReplicationSlot(PGconn *conn, const char *plugin,
 		*startpos = ((uint64) hi) << 32 | lo;
 	}
 
-	replication_slot = strdup(PQgetvalue(res, 0, 0));
 	PQclear(res);
 	return true;
 }
@@ -355,29 +367,31 @@ CreateReplicationSlot(PGconn *conn, const char *plugin,
  * returns true in case of success.
  */
 bool
-DropReplicationSlot(PGconn *conn, bool is_physical)
+DropReplicationSlot(PGconn *conn, const char *slot_name)
 {
-	char        query[256];
+	PQExpBuffer query;
 	PGresult   *res;
 
-	Assert(replication_slot != NULL);
+	Assert(slot_name != NULL);
+
+	query = createPQExpBuffer();
 
 	/* Build query */
-	snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
-			 replication_slot);
-	res = PQexec(conn, query);
+	appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
+					  slot_name);
+	res = PQexec(conn, query->data);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-				progname, query, PQerrorMessage(conn));
+				progname, query->data, PQerrorMessage(conn));
 		return false;
 	}
 
 	if (PQntuples(res) != 0 || PQnfields(res) != 0)
 	{
 		fprintf(stderr,
-				_("%s: could not stop %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, is_physical ? "physical" : "logical",
+				_("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+				progname, slot_name,
 				PQntuples(res), PQnfields(res), 0, 0);
 		return false;
 	}
@@ -389,8 +403,8 @@ DropReplicationSlot(PGconn *conn, bool is_physical)
 
 /*
  * Frontend version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The protocol always uses integer timestamps, regardless of
- * server setting.
+ * backend code. The replication protocol always uses integer timestamps,
+ * regardless of the server setting.
  */
 int64
 feGetCurrentTimestamp(void)
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 9067810..ac66145 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -31,9 +31,10 @@ extern PGconn *conn;
 extern PGconn *GetConnection(void);
 
 /* Replication commands */
-extern bool CreateReplicationSlot(PGconn *conn, const char *plugin,
-								  XLogRecPtr *startpos, bool is_physical);
-extern bool DropReplicationSlot(PGconn *conn, bool is_physical);
+extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
+								  const char *plugin, XLogRecPtr *startpos,
+								  bool is_physical);
+extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
 extern bool RunIdentifySystem(PGconn *conn, char **sysid,
 							  TimeLineID *starttli,
 							  XLogRecPtr *startpos,
-- 
1.8.3.251.g1462b67

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to