Hi,

On 2014-03-05 23:20:57 +0100, Andres Freund wrote:
> On 2014-03-05 17:05:24 -0500, Robert Haas wrote:
> > > I very much dislike having the three different event loops, but it's
> > > pretty much forced by the design of the xlogreader. "My" xlogreader
> > > version didn't block when it neeeded to wait for WAL but just returned
> > > "need input/output", but with the eventually committed version you're
> > > pretty much forced to block inside the read_page callback.
> > >
> > > I don't really have a idea how we could sensibly unify them atm.
> > 
> > WalSndLoop(void (*gutsfn)())?
> 
> The problem is that they are actually different. In the WalSndLoop we're
> also maintaining the walsender's state, in WalSndWriteData() we're just
> waiting for writes to be flushed, in WalSndWaitForWal we're primarily
> waiting for the flush pointer to pass some LSN. And the timing of the
> individual checks isn't trivial (just added some more comments about
> it).
> 
> I'll simplify it by pulling out more common code, maybe it'll become
> apparent how it should look.

I've attached a new version of the walsender patch. It's been rebased
ontop of Heikki's latest commit to walsender.c. I've changed a fair bit
of stuff:
* The sleeptime is now computed to sleep until we either need to send a
  keepalive or kill ourselves, as Heikki sugggested.
* Sleep time computation, sending pings, checking timeouts is now done
  in separate functions.
* Comment and codestyle improvements.

Although they are shorter and simpler now, I have not managed to unify
the three loops however. They seem to be too different to unify them
inside one. I tried a common function with an 'wait_for' bitmask
argument, but that turned out to be fairly illegible. The checks in
WalSndWaitForWal() and WalSndLoop() just seem to be too different.

I'd be grateful if you (or somebody else!) could have a quick look at
body of the loops in WalSndWriteData(), WalSndWaitForWal() and
WalSndLoop(). Maybe I am just staring at it the wrong way.

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From ca84cd3d2966f0e7297d1c7270b094122eec2f18 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 5 Mar 2014 00:15:38 +0100
Subject: [PATCH] Add walsender interface for the logical decoding
 functionality.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This exposes the changeset extraction feature via walsenders which
allows the data to be received in a streaming fashion and supports
synchronous replication.

To do this walsenders need to be able to connect to a specific
database. For that it extend the existing 'replication' parameter to
not only allow a boolean value but also "database". If the latter is
specified we connect to the database specified in 'dbname'.

Andres Freund, with contributions from Álvaro Herrera, reviewed by
Robert Haas.
---
 doc/src/sgml/protocol.sgml                         |  24 +-
 src/backend/postmaster/postmaster.c                |  28 +-
 .../libpqwalreceiver/libpqwalreceiver.c            |   4 +-
 src/backend/replication/repl_gram.y                |  81 +-
 src/backend/replication/repl_scanner.l             |   1 +
 src/backend/replication/walsender.c                | 914 ++++++++++++++++++---
 src/backend/utils/init/postinit.c                  |  15 +-
 src/bin/pg_basebackup/pg_basebackup.c              |   6 +-
 src/bin/pg_basebackup/pg_receivexlog.c             |   6 +-
 src/bin/pg_basebackup/receivelog.c                 |   6 +-
 src/include/replication/walsender.h                |   1 +
 src/tools/pgindent/typedefs.list                   |   1 +
 12 files changed, 918 insertions(+), 169 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index d36f2f3..510bf9a 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1302,10 +1302,13 @@
 
 <para>
 To initiate streaming replication, the frontend sends the
-<literal>replication</> parameter in the startup message. This tells the
-backend to go into walsender mode, wherein a small set of replication commands
-can be issued instead of SQL statements. Only the simple query protocol can be
-used in walsender mode.
+<literal>replication</> parameter in the startup message. A boolean value
+of <literal>true</> tells the backend to go into walsender mode, wherein a
+small set of replication commands can be issued instead of SQL statements. Only
+the simple query protocol can be used in walsender mode.
+Passing a <literal>database</> as the value instructs walsender to connect to
+the database specified in the <literal>dbname</> paramter which will in future
+allow some additional commands to the ones specified below to be run.
 
 The commands accepted in walsender mode are:
 
@@ -1315,7 +1318,7 @@ The commands accepted in walsender mode are:
     <listitem>
      <para>
       Requests the server to identify itself. Server replies with a result
-      set of a single row, containing three fields:
+      set of a single row, containing four fields:
      </para>
 
      <para>
@@ -1357,6 +1360,17 @@ The commands accepted in walsender mode are:
       </listitem>
       </varlistentry>
 
+      <varlistentry>
+      <term>
+       dbname
+      </term>
+      <listitem>
+      <para>
+       Database connected to or NULL.
+      </para>
+      </listitem>
+      </varlistentry>
+
       </variablelist>
      </para>
     </listitem>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index b7f99fc..8426824 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1884,10 +1884,23 @@ retry1:
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
 			{
-				if (!parse_bool(valptr, &am_walsender))
+				/*
+				 * Due to backward compatibility concerns the replication
+				 * parameter is a hybrid beast which allows the value to be
+				 * either boolean or the string 'database'. The latter
+				 * connects to a specific database which is e.g. required for
+				 * logical decoding while.
+				 */
+				if (strcmp(valptr, "database") == 0)
+				{
+					am_walsender = true;
+					am_db_walsender = true;
+				}
+				else if (!parse_bool(valptr, &am_walsender))
 					ereport(FATAL,
 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-							 errmsg("invalid value for boolean option \"replication\"")));
+							 errmsg("invalid value for parameter \"replication\""),
+							 errhint("Valid values are: false, 0, true, 1, database.")));
 			}
 			else
 			{
@@ -1968,8 +1981,15 @@ retry1:
 	if (strlen(port->user_name) >= NAMEDATALEN)
 		port->user_name[NAMEDATALEN - 1] = '\0';
 
-	/* Walsender is not related to a particular database */
-	if (am_walsender)
+	/*
+	 * Normal walsender backens, e.g. for streaming replication, are not
+	 * connected to a particular database. But walsenders used for logical
+	 * replication need to connect to a specific database. We allow streaming
+	 * replication commands to be issued even if connected to a database as it
+	 * can make sense to first make a basebackup and then stream changes
+	 * starting from that.
+	 */
+	if (am_walsender && !am_db_walsender)
 		port->database_name[0] = '\0';
 
 	/*
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c10374c..2fb731b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -131,7 +131,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 						"the primary server: %s",
 						PQerrorMessage(streamConn))));
 	}
-	if (PQnfields(res) != 3 || PQntuples(res) != 1)
+	if (PQnfields(res) < 3 || PQntuples(res) != 1)
 	{
 		int			ntuples = PQntuples(res);
 		int			nfields = PQnfields(res);
@@ -139,7 +139,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 		PQclear(res);
 		ereport(ERROR,
 				(errmsg("invalid response from primary server"),
-				 errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
+				 errdetail("Expected 1 tuple with 3 or more fields, got %d tuples with %d fields.",
 						   ntuples, nfields)));
 	}
 	primary_sysid = PQgetvalue(res, 0, 0);
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 308889b..154aaac 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -73,13 +73,17 @@ Node *replication_parse_result;
 %token K_WAL
 %token K_TIMELINE
 %token K_PHYSICAL
+%token K_LOGICAL
 %token K_SLOT
 
 %type <node>	command
-%type <node>	base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history
+%type <node>	base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system timeline_history
 %type <list>	base_backup_opt_list
 %type <defelt>	base_backup_opt
 %type <uintval>	opt_timeline
+%type <list>	plugin_options plugin_opt_list
+%type <defelt>	plugin_opt_elem
+%type <node>	plugin_opt_arg
 %type <str>		opt_slot
 
 %%
@@ -98,6 +102,7 @@ command:
 			identify_system
 			| base_backup
 			| start_replication
+			| start_logical_replication
 			| create_replication_slot
 			| drop_replication_slot
 			| timeline_history
@@ -165,8 +170,8 @@ base_backup_opt:
 				}
 			;
 
-/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
 create_replication_slot:
+			/* CREATE_REPLICATION_SLOT slot PHYSICAL */
 			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL
 				{
 					CreateReplicationSlotCmd *cmd;
@@ -175,9 +180,19 @@ create_replication_slot:
 					cmd->slotname = $2;
 					$$ = (Node *) cmd;
 				}
+			/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
+			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
+				{
+					CreateReplicationSlotCmd *cmd;
+					cmd = makeNode(CreateReplicationSlotCmd);
+					cmd->kind = REPLICATION_KIND_LOGICAL;
+					cmd->slotname = $2;
+					cmd->plugin = $4;
+					$$ = (Node *) cmd;
+				}
 			;
 
-/* DROP_REPLICATION_SLOT SLOT slot */
+/* DROP_REPLICATION_SLOT slot */
 drop_replication_slot:
 			K_DROP_REPLICATION_SLOT IDENT
 				{
@@ -205,19 +220,19 @@ start_replication:
 				}
 			;
 
-opt_timeline:
-			K_TIMELINE UCONST
+/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
+start_logical_replication:
+			K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
 				{
-					if ($2 <= 0)
-						ereport(ERROR,
-								(errcode(ERRCODE_SYNTAX_ERROR),
-								 (errmsg("invalid timeline %u", $2))));
-					$$ = $2;
+					StartReplicationCmd *cmd;
+					cmd = makeNode(StartReplicationCmd);
+					cmd->kind = REPLICATION_KIND_LOGICAL;;
+					cmd->slotname = $3;
+					cmd->startpoint = $5;
+					cmd->options = $6;
+					$$ = (Node *) cmd;
 				}
-			| /* EMPTY */
-				{ $$ = 0; }
 			;
-
 /*
  * TIMELINE_HISTORY %d
  */
@@ -250,6 +265,46 @@ opt_slot:
 				{ $$ = NULL; }
 			;
 
+opt_timeline:
+			K_TIMELINE UCONST
+				{
+					if ($2 <= 0)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 (errmsg("invalid timeline %u", $2))));
+					$$ = $2;
+				}
+				| /* EMPTY */			{ $$ = 0; }
+			;
+
+
+plugin_options:
+			'(' plugin_opt_list ')'			{ $$ = $2; }
+			| /* EMPTY */					{ $$ = NIL; }
+		;
+
+plugin_opt_list:
+			plugin_opt_elem
+				{
+					$$ = list_make1($1);
+				}
+			| plugin_opt_list ',' plugin_opt_elem
+				{
+					$$ = lappend($1, $3);
+				}
+		;
+
+plugin_opt_elem:
+			IDENT plugin_opt_arg
+				{
+					$$ = makeDefElem($1, $2);
+				}
+		;
+
+plugin_opt_arg:
+			SCONST							{ $$ = (Node *) makeString($1); }
+			| /* EMPTY */					{ $$ = NULL; }
+		;
 %%
 
 #include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index ca32aa6..a257124 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -94,6 +94,7 @@ CREATE_REPLICATION_SLOT		{ return K_CREATE_REPLICATION_SLOT; }
 DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
 PHYSICAL			{ return K_PHYSICAL; }
+LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 
 ","				{ return ','; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 003c797..4c48cd6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -45,15 +45,22 @@
 
 #include "access/timeline.h"
 #include "access/transam.h"
+#include "access/xact.h"
 #include "access/xlog_internal.h"
+
 #include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
 #include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "nodes/replnodes.h"
 #include "replication/basebackup.h"
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/logicalfuncs.h"
 #include "replication/slot.h"
+#include "replication/snapbuild.h"
 #include "replication/syncrep.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -92,9 +99,10 @@ WalSndCtlData *WalSndCtl = NULL;
 WalSnd	   *MyWalSnd = NULL;
 
 /* Global state */
-bool		am_walsender = false;		/* Am I a walsender process ? */
+bool		am_walsender = false;		/* Am I a walsender process? */
 bool		am_cascading_walsender = false;		/* Am I cascading WAL to
-												 * another standby ? */
+												 * another standby? */
+bool		am_db_walsender = false;	/* Connected to a database? */
 
 /* User-settable parameters for walsender */
 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
@@ -145,7 +153,7 @@ static StringInfoData tmpbuf;
 static TimestampTz last_reply_timestamp;
 
 /* Have we sent a heartbeat message asking for reply, since last reply? */
-static bool ping_sent = false;
+static bool waiting_for_ping_response = false;
 
 /*
  * While streaming WAL in Copy mode, streamingDoneSending is set to true
@@ -156,6 +164,9 @@ static bool ping_sent = false;
 static bool streamingDoneSending;
 static bool streamingDoneReceiving;
 
+/* Are we there yet? */
+static bool		WalSndCaughtUp = false;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t walsender_ready_to_stop = false;
@@ -168,24 +179,42 @@ static volatile sig_atomic_t walsender_ready_to_stop = false;
  */
 static volatile sig_atomic_t replication_active = false;
 
+static LogicalDecodingContext *logical_decoding_ctx = NULL;
+static XLogRecPtr  logical_startptr = InvalidXLogRecPtr;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static void WalSndLoop(void);
+typedef void (*WalSndSendDataCallback)(void);
+static void WalSndLoop(WalSndSendDataCallback send_data);
 static void InitWalSenderSlot(void);
 static void WalSndKill(int code, Datum arg);
-static void XLogSend(bool *caughtup);
+static void WalSndShutdown(void) __attribute__((noreturn));
+static void XLogSendPhysical(void);
+static void XLogSendLogical(void);
+static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
+static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
+static void StartLogicalReplication(StartReplicationCmd *cmd);
 static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
 static void WalSndKeepalive(bool requestReply);
+static void WalSndKeepaliveIfNecessary(TimestampTz now);
+static void WalSndCheckTimeOut(TimestampTz now);
+static long WalSndComputeSleeptime(TimestampTz now);
+static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+
+static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -241,6 +270,23 @@ WalSndErrorCleanup()
 }
 
 /*
+ * Handle a client's connection abort in an orderly manner.
+ */
+static void
+WalSndShutdown(void)
+{
+	/*
+	 * Reset whereToSendOutput to prevent ereport from attempting to send any
+	 * more messages to the standby.
+	 */
+	if (whereToSendOutput == DestRemote)
+		whereToSendOutput = DestNone;
+
+	proc_exit(0);
+	abort();					/* keep the compiler quiet */
+}
+
+/*
  * Handle the IDENTIFY_SYSTEM command.
  */
 static void
@@ -251,10 +297,12 @@ IdentifySystem(void)
 	char		tli[11];
 	char		xpos[MAXFNAMELEN];
 	XLogRecPtr	logptr;
+	char	   *dbname = NULL;
 
 	/*
-	 * Reply with a result set with one row, three columns. First col is
-	 * system ID, second is timeline ID, and third is current xlog location.
+	 * Reply with a result set with one row, four columns. First col is system
+	 * ID, second is timeline ID, third is current xlog location and the fourth
+	 * contains the database name if we are connected to one.
 	 */
 
 	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
@@ -273,9 +321,23 @@ IdentifySystem(void)
 
 	snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
+	if (MyDatabaseId != InvalidOid)
+	{
+		MemoryContext cur = CurrentMemoryContext;
+
+		/* syscache access needs a transaction env. */
+		StartTransactionCommand();
+		/* make dbname live outside TX context */
+		MemoryContextSwitchTo(cur);
+		dbname = get_database_name(MyDatabaseId);
+		CommitTransactionCommand();
+		/* CommitTransactionCommand switches to TopMemoryContext */
+		MemoryContextSwitchTo(cur);
+	}
+
 	/* Send a RowDescription message */
 	pq_beginmessage(&buf, 'T');
-	pq_sendint(&buf, 3, 2);		/* 3 fields */
+	pq_sendint(&buf, 4, 2);		/* 4 fields */
 
 	/* first field */
 	pq_sendstring(&buf, "systemid");	/* col name */
@@ -296,24 +358,43 @@ IdentifySystem(void)
 	pq_sendint(&buf, 0, 2);		/* format code */
 
 	/* third field */
-	pq_sendstring(&buf, "xlogpos");
-	pq_sendint(&buf, 0, 4);
-	pq_sendint(&buf, 0, 2);
-	pq_sendint(&buf, TEXTOID, 4);
-	pq_sendint(&buf, -1, 2);
-	pq_sendint(&buf, 0, 4);
-	pq_sendint(&buf, 0, 2);
+	pq_sendstring(&buf, "xlogpos"); /* col name */
+	pq_sendint(&buf, 0, 4);     /* table oid */
+	pq_sendint(&buf, 0, 2);     /* attnum */
+	pq_sendint(&buf, TEXTOID, 4);       /* type oid */
+	pq_sendint(&buf, -1, 2);        /* typlen */
+	pq_sendint(&buf, 0, 4);     /* typmod */
+	pq_sendint(&buf, 0, 2);     /* format code */
+
+	/* fourth field */
+	pq_sendstring(&buf, "dbname");  /* col name */
+	pq_sendint(&buf, 0, 4);     /* table oid */
+	pq_sendint(&buf, 0, 2);     /* attnum */
+	pq_sendint(&buf, TEXTOID, 4);       /* type oid */
+	pq_sendint(&buf, -1, 2);        /* typlen */
+	pq_sendint(&buf, 0, 4);     /* typmod */
+	pq_sendint(&buf, 0, 2);     /* format code */
 	pq_endmessage(&buf);
 
 	/* Send a DataRow message */
 	pq_beginmessage(&buf, 'D');
-	pq_sendint(&buf, 3, 2);		/* # of columns */
+	pq_sendint(&buf, 4, 2);		/* # of columns */
 	pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
 	pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
 	pq_sendint(&buf, strlen(tli), 4);	/* col2 len */
 	pq_sendbytes(&buf, (char *) tli, strlen(tli));
 	pq_sendint(&buf, strlen(xpos), 4);	/* col3 len */
 	pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+	/* send NULL if not connected to a database */
+	if (dbname)
+	{
+		pq_sendint(&buf, strlen(dbname), 4);    /* col4 len */
+		pq_sendbytes(&buf, (char *) dbname, strlen(dbname));
+	}
+	else
+	{
+		pq_sendint(&buf, -1, 4);    /* col4 len, NULL */
+	}
 
 	pq_endmessage(&buf);
 }
@@ -572,7 +653,7 @@ StartReplication(StartReplicationCmd *cmd)
 		/* Main loop of walsender */
 		replication_active = true;
 
-		WalSndLoop();
+		WalSndLoop(XLogSendPhysical);
 
 		replication_active = false;
 		if (walsender_ready_to_stop)
@@ -643,12 +724,47 @@ StartReplication(StartReplicationCmd *cmd)
 }
 
 /*
+ * read_page callback for logical decoding contexts, as a walsender process.
+ *
+ * Inside the walsender we can do better than logical_read_local_xlog_page,
+ * which has to do a plain sleep/busy loop, because the walsender's latch gets
+ * set everytime WAL is flushed.
+ */
+static int
+logical_read_xlog_page(XLogReaderState* state, XLogRecPtr targetPagePtr, int reqLen,
+				 XLogRecPtr targetRecPtr, char* cur_page, TimeLineID *pageTLI)
+{
+	XLogRecPtr flushptr;
+	int		count;
+
+	/* make sure we have enough WAL available */
+	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
+
+	/* more than one block available */
+	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+		count = XLOG_BLCKSZ;
+	/* not enough WAL synced, that can happen during shutdown */
+	else if (targetPagePtr + reqLen > flushptr)
+		return -1;
+	/* part of the page available */
+	else
+		count = flushptr - targetPagePtr;
+
+	/* now actually read the data, we know it's there */
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+
+	return count;
+}
+
+/*
  * Create a new replication slot.
  */
 static void
 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 {
 	const char *slot_name;
+	const char *snapshot_name = NULL;
+	char        xpos[MAXFNAMELEN];
 	StringInfoData buf;
 
 	Assert(!MyReplicationSlot);
@@ -657,24 +773,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	sendTimeLineIsHistoric = false;
 	sendTimeLine = ThisTimeLineID;
 
-	ReplicationSlotCreate(cmd->slotname,
-						  cmd->kind == REPLICATION_KIND_LOGICAL,
-						  RS_PERSISTENT);
+	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
+	{
+		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
+	}
+	else
+	{
+		CheckLogicalDecodingRequirements();
+		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
+	}
 
 	initStringInfo(&output_message);
 
 	slot_name = NameStr(MyReplicationSlot->data.name);
 
-	/*
-	 * It may seem somewhat pointless to send back the same slot name the
-	 * client just requested and nothing else, but logical replication
-	 * will add more fields here.  (We could consider removing the slot
-	 * name from what's sent back, though, since the client has specified
-	 * that.)
-	 */
+	if (cmd->kind == REPLICATION_KIND_LOGICAL)
+	{
+		LogicalDecodingContext *ctx;
+
+		ctx = CreateInitDecodingContext(
+			cmd->plugin, NIL,
+			logical_read_xlog_page,
+			WalSndPrepareWrite, WalSndWriteData);
+
+		/* build initial snapshot, might take a while */
+		DecodingContextFindStartpoint(ctx);
+
+		/*
+		 * Export a plain (not of the snapbuild.c type) snapshot to the user
+		 * that can be imported into another session.
+		 */
+		snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+
+		/* don't need the decoding context anymore */
+		FreeDecodingContext(ctx);
+
+		ReplicationSlotPersist();
+	}
+
+	slot_name = NameStr(MyReplicationSlot->data.name);
+	snprintf(xpos, sizeof(xpos), "%X/%X",
+			 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
+			 (uint32) MyReplicationSlot->data.confirmed_flush);
 
 	pq_beginmessage(&buf, 'T');
-	pq_sendint(&buf, 1, 2);		/* 1 field */
+	pq_sendint(&buf, 4, 2);		/* 4 fields */
 
 	/* first field: slot name */
 	pq_sendstring(&buf, "slot_name");	/* col name */
@@ -685,16 +828,65 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	pq_sendint(&buf, 0, 4);		/* typmod */
 	pq_sendint(&buf, 0, 2);		/* format code */
 
+	/* second field: LSN at which we became consistent */
+	pq_sendstring(&buf, "consistent_point");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
+	/* third field: exported snapshot's name */
+	pq_sendstring(&buf, "snapshot_name");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
+	/* fourth field: output plugin */
+	pq_sendstring(&buf, "output_plugin");	/* col name */
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, 0, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, 0, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+
 	pq_endmessage(&buf);
 
 	/* Send a DataRow message */
 	pq_beginmessage(&buf, 'D');
-	pq_sendint(&buf, 1, 2);		/* # of columns */
+	pq_sendint(&buf, 4, 2);		/* # of columns */
 
 	/* slot_name */
 	pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
 	pq_sendbytes(&buf, slot_name, strlen(slot_name));
 
+	/* consistent wal location */
+	pq_sendint(&buf, strlen(xpos), 4); /* col2 len */
+	pq_sendbytes(&buf, xpos, strlen(xpos));
+
+	/* snapshot name */
+	if (snapshot_name != NULL)
+	{
+		pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */
+		pq_sendbytes(&buf, snapshot_name, strlen(snapshot_name));
+	}
+	else
+		pq_sendint(&buf, -1, 4);    /* col3 len, NULL */
+
+	/* plugin */
+	if (cmd->plugin != NULL)
+	{
+		pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */
+		pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin));
+	}
+	else
+		pq_sendint(&buf, -1, 4);	/* col4 len, NULL */
+
 	pq_endmessage(&buf);
 
 	/*
@@ -714,6 +906,339 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
 }
 
 /*
+ * Load previously initiated logical slot and prepare for sending data (via
+ * WalSndLoop).
+ */
+static void
+StartLogicalReplication(StartReplicationCmd *cmd)
+{
+	StringInfoData buf;
+
+	/* make sure that our requirements are still fulfilled */
+	CheckLogicalDecodingRequirements();
+
+	Assert(!MyReplicationSlot);
+
+	ReplicationSlotAcquire(cmd->slotname);
+
+	/*
+	 * Force a disconnect, so that the decoding code doesn't need to care
+	 * about a eventual switch from running in recovery, to running in a
+	 * normal environment. Client code is expected to handle reconnects.
+	 */
+	if (am_cascading_walsender && !RecoveryInProgress())
+	{
+		ereport(LOG,
+				(errmsg("terminating walsender process after promotion.")));
+		walsender_ready_to_stop = true;
+	}
+
+	WalSndSetState(WALSNDSTATE_CATCHUP);
+
+	/* Send a CopyBothResponse message, and start streaming */
+	pq_beginmessage(&buf, 'W');
+	pq_sendbyte(&buf, 0);
+	pq_sendint(&buf, 0, 2);
+	pq_endmessage(&buf);
+	pq_flush();
+
+	/* setup state for XLogReadPage */
+	sendTimeLineIsHistoric = false;
+	sendTimeLine = ThisTimeLineID;
+
+	/*
+	 * Initialize position to the last ack'ed one, then the xlog records begin
+	 * to be shipped from that position.
+	 */
+	logical_decoding_ctx = CreateDecodingContext(
+		cmd->startpoint, cmd->options,
+		logical_read_xlog_page,
+		WalSndPrepareWrite, WalSndWriteData);
+
+	/* Start reading WAL from the oldest required WAL. */
+	logical_startptr = MyReplicationSlot->data.restart_lsn;
+
+	/*
+	 * Report the location after which we'll send out further commits as the
+	 * current sentPtr.
+	 */
+	sentPtr = MyReplicationSlot->data.confirmed_flush;
+
+	/* Also update the sent position status in shared memory */
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = MyWalSnd;
+
+		SpinLockAcquire(&walsnd->mutex);
+		walsnd->sentPtr = MyReplicationSlot->data.restart_lsn;
+		SpinLockRelease(&walsnd->mutex);
+	}
+
+	replication_active = true;
+
+	SyncRepInitConfig();
+
+	/* Main loop of walsender */
+	WalSndLoop(XLogSendLogical);
+
+	FreeDecodingContext(logical_decoding_ctx);
+	ReplicationSlotRelease();
+
+	replication_active = false;
+	if (walsender_ready_to_stop)
+		proc_exit(0);
+	WalSndSetState(WALSNDSTATE_STARTUP);
+
+	/* Get out of COPY mode (CommandComplete). */
+	EndCommand("COPY 0", DestRemote);
+}
+
+/*
+ * LogicalDecodingContext 'prepare_write' callback.
+ *
+ * Prepare a write into a StringInfo.
+ *
+ * Don't do anything lasting in here, it's quite possible that nothing will done
+ * with the data.
+ */
+static void
+WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
+{
+	/* can't have sync rep confused by sending the same LSN several times */
+	if (!last_write)
+		lsn = InvalidXLogRecPtr;
+
+	resetStringInfo(ctx->out);
+
+	pq_sendbyte(ctx->out, 'w');
+	pq_sendint64(ctx->out, lsn);	/* dataStart */
+	pq_sendint64(ctx->out, lsn);	/* walEnd */
+	/*
+	 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
+	 * reserve space here.
+	 */
+	pq_sendint64(ctx->out, 0);		/* sendtime */
+}
+
+/*
+ * LogicalDecodingContext 'write' callback.
+ *
+ * Actually write out data previously prepared by WalSndPrepareWrite out to
+ * the network. Take as long as needed, but process replies from the other
+ * side and check timeouts during that.
+ */
+static void
+WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+				bool last_write)
+{
+	/* output previously gathered data in a CopyData packet */
+	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
+
+	/*
+	 * Fill the send timestamp last, so that it is taken as late as
+	 * possible. This is somewhat ugly, but the protocol's set as it's already
+	 * used for several releases by streaming physical replication.
+	 */
+	resetStringInfo(&tmpbuf);
+	pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
+	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
+		   tmpbuf.data, sizeof(int64));
+
+	/* fast path */
+	/* Try to flush pending output to the client */
+	if (pq_flush_if_writable() != 0)
+		WalSndShutdown();
+
+	if (!pq_is_send_pending())
+		return;
+
+	for (;;)
+	{
+		int			wakeEvents;
+		long		sleeptime;
+		TimestampTz	now;
+
+		/*
+		 * Emergency bailout if postmaster has died.  This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (!PostmasterIsAlive())
+			exit(1);
+
+		/* Process any requests or signals received recently */
+		if (got_SIGHUP)
+		{
+			got_SIGHUP = false;
+			ProcessConfigFile(PGC_SIGHUP);
+			SyncRepInitConfig();
+		}
+
+		/* Check for input from the client */
+		ProcessRepliesIfAny();
+
+		/* Clear any already-pending wakeups */
+		ResetLatch(&MyWalSnd->latch);
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+
+		/* If we finished clearing the buffered data, we're done here. */
+		if (!pq_is_send_pending())
+			break;
+
+		now = GetCurrentTimestamp();
+
+		/* die if timeout was reached */
+		WalSndCheckTimeOut(now);
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary(now);
+
+		sleeptime = WalSndComputeSleeptime(now);
+
+		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
+
+		/* Sleep until something happens or we time out */
+		ImmediateInterruptOK = true;
+		CHECK_FOR_INTERRUPTS();
+		WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+						  MyProcPort->sock, sleeptime);
+		ImmediateInterruptOK = false;
+	}
+
+	/* reactivate latch so WalSndLoop knows to continue */
+	SetLatch(&MyWalSnd->latch);
+}
+
+/*
+ * Wait till WAL < loc is flushed to disk so it can be safely read.
+ */
+static XLogRecPtr
+WalSndWaitForWal(XLogRecPtr loc)
+{
+	int			wakeEvents;
+	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+
+
+	/*
+	 * Fast path to avoid acquiring the spinlock in the we already know we
+	 * have enough WAL available. This is particularly interesting if we're
+	 * far behind.
+	 */
+	if (RecentFlushPtr != InvalidXLogRecPtr &&
+		loc <= RecentFlushPtr)
+		return RecentFlushPtr;
+
+	/* Get a more recent flush pointer. */
+	if (!RecoveryInProgress())
+		RecentFlushPtr = GetFlushRecPtr();
+	else
+		RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+
+	for (;;)
+	{
+		long		sleeptime;
+		TimestampTz	now;
+
+		/*
+		 * Emergency bailout if postmaster has died.  This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (!PostmasterIsAlive())
+			exit(1);
+
+		/* Process any requests or signals received recently */
+		if (got_SIGHUP)
+		{
+			got_SIGHUP = false;
+			ProcessConfigFile(PGC_SIGHUP);
+			SyncRepInitConfig();
+		}
+
+		/* Check for input from the client */
+		ProcessRepliesIfAny();
+
+		/* Clear any already-pending wakeups */
+		ResetLatch(&MyWalSnd->latch);
+
+		/* Update our idea of the currently flushed position. */
+		if (!RecoveryInProgress())
+			RecentFlushPtr = GetFlushRecPtr();
+		else
+			RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+
+		/*
+		 * If postmaster asked us to stop, don't wait here anymore. This will
+		 * cause the xlogreader to return without reading a full record, which
+		 * is the fastest way to reach the mainloop which then can quit.
+		 *
+		 * It's important to do this check after the recomputation of
+		 * RecentFlushPtr, so we can send all remaining data before shutting
+		 * down.
+		 */
+		if (walsender_ready_to_stop)
+			break;
+
+		/*
+		 * We only send regular messages to the client for full decoded
+		 * transactions, but a synchronous replication and walsender shutdown
+		 * possibly are waiting for a later location. So we send pings
+		 * containing the flush location every now and then.
+		 */
+		if (MyWalSnd->flush < sentPtr && !waiting_for_ping_response)
+		{
+			WalSndKeepalive(true);
+			waiting_for_ping_response = true;
+		}
+
+		/* check whether we're done */
+		if (loc <= RecentFlushPtr)
+			break;
+
+		/* Waiting for new WAL. Since we need to wait, we're now caught up. */
+		WalSndCaughtUp = true;
+
+		/*
+		 * Try to flush pending output to the client. Also wait for the socket
+		 * becoming writable, if there's still pending output after an attempt
+		 * to flush. Otherwise we might just sit on output data while waiting
+		 * for new WAL being generated.
+		 */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+
+		now = GetCurrentTimestamp();
+
+		/* die if timeout was reached */
+		WalSndCheckTimeOut(now);
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary(now);
+
+		sleeptime = WalSndComputeSleeptime(now);
+
+		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
+			WL_SOCKET_READABLE | WL_TIMEOUT;
+
+		if (pq_is_send_pending())
+			wakeEvents |= WL_SOCKET_WRITEABLE;
+
+		/* Sleep until something happens or we time out */
+		ImmediateInterruptOK = true;
+		CHECK_FOR_INTERRUPTS();
+		WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
+						  MyProcPort->sock, sleeptime);
+		ImmediateInterruptOK = false;
+	}
+
+	/* reactivate latch so WalSndLoop knows to continue */
+	SetLatch(&MyWalSnd->latch);
+	return RecentFlushPtr;
+}
+
+/*
  * Execute an incoming replication command.
  */
 void
@@ -724,6 +1249,12 @@ exec_replication_command(const char *cmd_string)
 	MemoryContext cmd_context;
 	MemoryContext old_context;
 
+	/*
+	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
+	 * command arrives. Clean up the old stuff if there's anything.
+	 */
+	SnapBuildClearExportedSnapshot();
+
 	elog(DEBUG1, "received replication command: %s", cmd_string);
 
 	CHECK_FOR_INTERRUPTS();
@@ -769,7 +1300,7 @@ exec_replication_command(const char *cmd_string)
 				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 					StartReplication(cmd);
 				else
-					elog(ERROR, "cannot handle logical decoding yet");
+					StartLogicalReplication(cmd);
 				break;
 			}
 
@@ -887,7 +1418,7 @@ ProcessRepliesIfAny(void)
 	if (received)
 	{
 		last_reply_timestamp = GetCurrentTimestamp();
-		ping_sent = false;
+		waiting_for_ping_response = false;
 	}
 }
 
@@ -1020,7 +1551,7 @@ ProcessStandbyReplyMessage(void)
 	if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
 	{
 		if (MyReplicationSlot->data.database != InvalidOid)
-			elog(ERROR, "cannot handle logical decoding yet");
+			LogicalConfirmReceivedLocation(flushPtr);
 		else
 			PhysicalConfirmReceivedLocation(flushPtr);
 	}
@@ -1146,12 +1677,81 @@ ProcessStandbyHSFeedbackMessage(void)
 		MyPgXact->xmin = feedbackXmin;
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Compute how long send/receive loops should sleep.
+ *
+ * If wal_sender_timeout is enabled we want to wake up in time to send
+ * keepalives and to abort the connection if wal_sender_timeout has been
+ * reached.
+ */
+static long
+WalSndComputeSleeptime(TimestampTz now)
+{
+	long		sleeptime = 10000;		/* 10 s */
+
+	if (wal_sender_timeout > 0)
+	{
+		TimestampTz wakeup_time;
+		long sec_to_timeout;
+		int microsec_to_timeout;
+
+		/*
+		 * At the latest stop sleeping once wal_sender_timeout has been
+		 * reached.
+		 */
+		wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+												  wal_sender_timeout);
+
+		/*
+		 * If no ping has been sent yet, wakeup when it's time to do
+		 * so. WalSndKeepaliveIfNecessary() wants to send a keepalive once
+		 * half of the timeout passed without a response.
+		 */
+		if (!waiting_for_ping_response)
+			wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+													wal_sender_timeout / 2);
+
+		/* Compute relative time until wakeup. */
+		TimestampDifference(now, wakeup_time,
+							&sec_to_timeout, &microsec_to_timeout);
+
+		sleeptime = sec_to_timeout * 1000 +
+			microsec_to_timeout / 1000;
+	}
+
+	return sleeptime;
+}
+
+/*
+ * Check whether there have been responses by the client within
+ * wal_sender_timeout and shutdown if not.
+ */
 static void
-WalSndLoop(void)
+WalSndCheckTimeOut(TimestampTz now)
 {
-	bool		caughtup = false;
+	TimestampTz timeout;
+
+	timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+										  wal_sender_timeout);
 
+	if (wal_sender_timeout > 0 && now >= timeout)
+	{
+		/*
+		 * Since typically expiration of replication timeout means
+		 * communication problem, we don't send the error message to
+		 * the standby.
+		 */
+		ereport(COMMERROR,
+				(errmsg("terminating walsender process due to replication timeout")));
+
+		WalSndShutdown();
+	}
+}
+
+/* Main loop of walsender process that streams the WAL over Copy messages. */
+static void
+WalSndLoop(WalSndSendDataCallback send_data)
+{
 	/*
 	 * Allocate buffers that will be used for each outgoing and incoming
 	 * message.  We do this just once to reduce palloc overhead.
@@ -1162,7 +1762,7 @@ WalSndLoop(void)
 
 	/* Initialize the last reply timestamp */
 	last_reply_timestamp = GetCurrentTimestamp();
-	ping_sent = false;
+	waiting_for_ping_response = false;
 
 	/*
 	 * Loop until we reach the end of this timeline or the client requests to
@@ -1170,8 +1770,7 @@ WalSndLoop(void)
 	 */
 	for (;;)
 	{
-		/* Clear any already-pending wakeups */
-		ResetLatch(&MyWalSnd->latch);
+		TimestampTz	now;
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -1193,6 +1792,9 @@ WalSndLoop(void)
 		/* Check for input from the client */
 		ProcessRepliesIfAny();
 
+		/* Clear any already-pending wakeups */
+		ResetLatch(&MyWalSnd->latch);
+
 		/*
 		 * If we have received CopyDone from the client, sent CopyDone
 		 * ourselves, and the output buffer is empty, it's time to exit
@@ -1203,21 +1805,21 @@ WalSndLoop(void)
 
 		/*
 		 * If we don't have any pending data in the output buffer, try to send
-		 * some more.  If there is some, we don't bother to call XLogSend
+		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
 		 */
 		if (!pq_is_send_pending())
-			XLogSend(&caughtup);
+			send_data();
 		else
-			caughtup = false;
+			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
-			goto send_failure;
+			WalSndShutdown();
 
 		/* If nothing remains to be sent right now ... */
-		if (caughtup && !pq_is_send_pending())
+		if (WalSndCaughtUp && !pq_is_send_pending())
 		{
 			/*
 			 * If we're in catchup state, move to streaming.  This is an
@@ -1243,111 +1845,47 @@ WalSndLoop(void)
 			 * the walsender is not sure which.
 			 */
 			if (walsender_ready_to_stop)
-			{
-				/* ... let's just be real sure we're caught up ... */
-				XLogSend(&caughtup);
-				if (caughtup && sentPtr == MyWalSnd->flush &&
-					!pq_is_send_pending())
-				{
-					/* Inform the standby that XLOG streaming is done */
-					EndCommand("COPY 0", DestRemote);
-					pq_flush();
-
-					proc_exit(0);
-				}
-			}
+				WalSndDone(send_data);
 		}
 
-		/*
-		 * If half of wal_sender_timeout has elapsed without receiving any
-		 * reply from standby, send a keep-alive message requesting an
-		 * immediate reply.
-		 */
-		if (wal_sender_timeout > 0 && !ping_sent)
-		{
-			TimestampTz timeout;
+		now = GetCurrentTimestamp();
 
-			timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
-												  wal_sender_timeout / 2);
-			if (GetCurrentTimestamp() >= timeout)
-			{
-				WalSndKeepalive(true);
-				ping_sent = true;
-				/* Try to flush pending output to the client */
-				if (pq_flush_if_writable() != 0)
-					goto send_failure;
-			}
-		}
+		/* Check for replication timeout. */
+		WalSndCheckTimeOut(now);
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary(now);
 
 		/*
 		 * We don't block if not caught up, unless there is unsent data
 		 * pending in which case we'd better block until the socket is
-		 * write-ready.  This test is only needed for the case where XLogSend
-		 * loaded a subset of the available data but then pq_flush_if_writable
-		 * flushed it all --- we should immediately try to send more.
+		 * write-ready.  This test is only needed for the case where the
+		 * send_data callback handled a subset of the available data but then
+		 * pq_flush_if_writable flushed it all --- we should immediately try
+		 * to send more.
 		 */
-		if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
+		if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
 		{
-			TimestampTz timeout;
-			long		sleeptime = 10000;		/* 10 s */
+			long		sleeptime;
 			int			wakeEvents;
 
 			wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
 				WL_SOCKET_READABLE;
 
+			sleeptime = WalSndComputeSleeptime(now);
+
 			if (pq_is_send_pending())
 				wakeEvents |= WL_SOCKET_WRITEABLE;
 
-			/*
-			 * If wal_sender_timeout is active, sleep in smaller increments
-			 * to not go over the timeout too much. XXX: Why not just sleep
-			 * until the timeout has elapsed?
-			 */
-			if (wal_sender_timeout > 0)
-				sleeptime = 1 + (wal_sender_timeout / 10);
-
 			/* Sleep until something happens or we time out */
 			ImmediateInterruptOK = true;
 			CHECK_FOR_INTERRUPTS();
 			WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
 							  MyProcPort->sock, sleeptime);
 			ImmediateInterruptOK = false;
-
-			/*
-			 * Check for replication timeout.  Note we ignore the corner case
-			 * possibility that the client replied just as we reached the
-			 * timeout ... he's supposed to reply *before* that.
-			 */
-			timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
-												  wal_sender_timeout);
-			if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
-			{
-				/*
-				 * Since typically expiration of replication timeout means
-				 * communication problem, we don't send the error message to
-				 * the standby.
-				 */
-				ereport(COMMERROR,
-						(errmsg("terminating walsender process due to replication timeout")));
-				goto send_failure;
-			}
 		}
 	}
 	return;
-
-send_failure:
-
-	/*
-	 * Get here on send failure.  Clean up and exit.
-	 *
-	 * Reset whereToSendOutput to prevent ereport from attempting to send any
-	 * more messages to the standby.
-	 */
-	if (whereToSendOutput == DestRemote)
-		whereToSendOutput = DestNone;
-
-	proc_exit(0);
-	abort();					/* keep the compiler quiet */
 }
 
 /* Initialize a per-walsender data structure for this walsender process */
@@ -1605,15 +2143,17 @@ retry:
 }
 
 /*
+ * Send out the WAL in its normal physical/stored form.
+ *
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
  * but not yet sent to the client, and buffer it in the libpq output
  * buffer.
  *
- * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
- * *caughtup is set to false.
+ * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
+ * otherwise WalSndCaughtUp is set to false.
  */
 static void
-XLogSend(bool *caughtup)
+XLogSendPhysical(void)
 {
 	XLogRecPtr	SendRqstPtr;
 	XLogRecPtr	startptr;
@@ -1622,7 +2162,7 @@ XLogSend(bool *caughtup)
 
 	if (streamingDoneSending)
 	{
-		*caughtup = true;
+		WalSndCaughtUp = true;
 		return;
 	}
 
@@ -1739,7 +2279,7 @@ XLogSend(bool *caughtup)
 		pq_putmessage_noblock('c', NULL, 0);
 		streamingDoneSending = true;
 
-		*caughtup = true;
+		WalSndCaughtUp = true;
 
 		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
 			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
@@ -1751,7 +2291,7 @@ XLogSend(bool *caughtup)
 	Assert(sentPtr <= SendRqstPtr);
 	if (SendRqstPtr <= sentPtr)
 	{
-		*caughtup = true;
+		WalSndCaughtUp = true;
 		return;
 	}
 
@@ -1775,15 +2315,15 @@ XLogSend(bool *caughtup)
 	{
 		endptr = SendRqstPtr;
 		if (sendTimeLineIsHistoric)
-			*caughtup = false;
+			WalSndCaughtUp = false;
 		else
-			*caughtup = true;
+			WalSndCaughtUp = true;
 	}
 	else
 	{
 		/* round down to page boundary. */
 		endptr -= (endptr % XLOG_BLCKSZ);
-		*caughtup = false;
+		WalSndCaughtUp = false;
 	}
 
 	nbytes = endptr - startptr;
@@ -1844,6 +2384,85 @@ XLogSend(bool *caughtup)
 }
 
 /*
+ * Stream out logically decoded data.
+ */
+static void
+XLogSendLogical(void)
+{
+	XLogRecord *record;
+	char	   *errm;
+
+	/*
+	 * Don't know whether we've caught up yet. We'll set it to true in
+	 * WalSndWaitForWal, if we're actually waiting. We also set to true if
+	 * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
+	 * i.e. when we're shutting down.
+	 */
+	WalSndCaughtUp = false;
+
+	record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
+	logical_startptr = InvalidXLogRecPtr;
+
+	/* xlog record was invalid */
+	if (errm != NULL)
+		elog(ERROR, "%s", errm);
+
+	if (record != NULL)
+	{
+		LogicalDecodingProcessRecord(logical_decoding_ctx, record);
+
+		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+	}
+	else
+	{
+		/*
+		 * If the record we just wanted read is at or beyond the flushed point,
+		 * then we're caught up.
+		 */
+		if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
+			WalSndCaughtUp = true;
+	}
+
+	/* Update shared memory status */
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = MyWalSnd;
+
+		SpinLockAcquire(&walsnd->mutex);
+		walsnd->sentPtr = sentPtr;
+		SpinLockRelease(&walsnd->mutex);
+	}
+}
+
+/*
+ * Shutdown if the sender is caught up.
+ *
+ * NB: This should only be called when the shutdown signal has been received
+ * from postmaster.
+ *
+ * Note that if we determine that there's still more data to send, this
+ * function will return control to the caller.
+ */
+static void
+WalSndDone(WalSndSendDataCallback send_data)
+{
+	/* ... let's just be real sure we're caught up ... */
+	send_data();
+
+	if (WalSndCaughtUp && sentPtr == MyWalSnd->flush &&
+		!pq_is_send_pending())
+	{
+		/* Inform the standby that XLOG streaming is done */
+		EndCommand("COPY 0", DestRemote);
+		pq_flush();
+
+		proc_exit(0);
+	}
+	if (!waiting_for_ping_response)
+		WalSndKeepalive(true);
+}
+
+/*
  * Returns the latest point in WAL that has been safely flushed to disk, and
  * can be sent to the standby. This should only be called when in recovery,
  * ie. we're streaming to a cascaded standby.
@@ -2239,6 +2858,39 @@ WalSndKeepalive(bool requestReply)
 }
 
 /*
+ * Send keepalive message if close enough to a shutdown due to
+ * wal_sender_timeout.
+ */
+static void
+WalSndKeepaliveIfNecessary(TimestampTz now)
+{
+	TimestampTz ping_time;
+
+	if (wal_sender_timeout <= 0)
+		return;
+
+	if (waiting_for_ping_response)
+		return;
+
+	/*
+	 * If half of wal_sender_timeout has lapsed without receiving any reply
+	 * from the standby, send a keep-alive message to the standby requesting
+	 * an immediate reply.
+	 */
+	ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+											wal_sender_timeout / 2);
+	if (now >= ping_time)
+	{
+		WalSndKeepalive(true);
+		waiting_for_ping_response = true;
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+	}
+}
+
+/*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the
  * future for synchronous replication.
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 3ecc4d3..89a7c9e 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -729,11 +729,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
 				 errmsg("remaining connection slots are reserved for non-replication superuser connections")));
 
-	/*
-	 * If walsender, we don't want to connect to any particular database. Just
-	 * finish the backend startup by processing any options from the startup
-	 * packet, and we're done.
-	 */
+	/* Check replication permissions needed for walsender processes. */
 	if (am_walsender)
 	{
 		Assert(!bootstrap);
@@ -742,7 +738,16 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 			ereport(FATAL,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 					 errmsg("must be superuser or replication role to start walsender")));
+	}
 
+	/*
+	 * If this is a plain walsender only supporting physical replication, we
+	 * don't want to connect to any particular database. Just finish the
+	 * backend startup by processing any options from the startup packet, and
+	 * we're done.
+	 */
+	if (am_walsender && !am_db_walsender)
+	{
 		/* process any options passed in the startup packet */
 		if (MyProcPort != NULL)
 			process_startup_options(MyProcPort, am_superuser);
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 919805f..f93b8c9 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1639,11 +1639,11 @@ BaseBackup(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
 	{
 		fprintf(stderr,
-				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+				_("%s: could not identify system: got %d rows and %d fields, expected 1 row and 3 or more fields\n"),
+				progname, PQntuples(res), PQnfields(res));
 		disconnect_and_exit(1);
 	}
 	sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 0f191ce..7ae20d1 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -275,11 +275,11 @@ StreamLog(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
 	{
 		fprintf(stderr,
-				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+				_("%s: could not identify system: got %d rows and %d fields, expected 1 row and 3 or more fields\n"),
+				progname, PQntuples(res), PQnfields(res));
 		disconnect_and_exit(1);
 	}
 	servertli = atoi(PQgetvalue(res, 0, 1));
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index ef73b4b..64730d9 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -563,11 +563,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			PQclear(res);
 			return false;
 		}
-		if (PQnfields(res) != 3 || PQntuples(res) != 1)
+		if (PQntuples(res) != 1 || PQnfields(res) < 3)
 		{
 			fprintf(stderr,
-					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, PQntuples(res), PQnfields(res), 1, 3);
+					_("%s: could not identify system: got %d rows and %d fields, expected 1 row and 3 or more fields\n"),
+					progname, PQntuples(res), PQnfields(res));
 			PQclear(res);
 			return false;
 		}
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index b67cf63..cff2be6 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -19,6 +19,7 @@
 /* global state */
 extern bool am_walsender;
 extern bool am_cascading_walsender;
+extern bool am_db_walsender;
 extern bool wake_wal_senders;
 
 /* user-settable parameters */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f960454..62a892b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1909,6 +1909,7 @@ WalRcvData
 WalRcvState
 WalSnd
 WalSndCtlData
+WalSndSendDataCallback
 WalSndState
 WholeRowVarExprState
 WindowAgg
-- 
1.8.5.rc2.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to