On 20/01/17 17:23, Petr Jelinek wrote:
> On 20/01/17 15:08, Peter Eisentraut wrote:
>> On 1/19/17 5:01 PM, Petr Jelinek wrote:
>>> There were some conflicting changes committed today so I rebased the
>>> patch on top of them.
>>>
>>> Other than that nothing much has changed, I removed the separate sync
>>> commit patch, included the rename patch in the patchset and fixed the
>>> bug around pg_subscription catalog reported by Erik Rijkers.
>>
>> Committed.  I haven't reviewed the rename patch yet, so I'll get back to
>> that later.
>>
> 
> Hi,
> 
> Thanks!
> 
> Here is fix for the dependency mess.
> 

Álvaro pointed out off list couple of issues with how we handle
interruption of commands that connect to walsender.

a) The libpqwalreceiver.c does blocking connect so it's impossible to
cancel CREATE SUBSCRIPTION which is stuck on connect. This is btw
preexisting problem and applies to walreceiver as well. I rewrote the
connect function to use asynchronous API (patch 0001).

b) We can cancel in middle of the command (when stuck in
libpqrcv_PQexec) but the connection to walsender stays open which in
case we are waiting for snapshot can mean that it will stay idle in
transaction. I added PG_TRY wrapper which disconnects on error around
this (patch 0002).

And finally, while testing these two I found bug in walsender StringInfo
initialization (or lack there of). There are 3 static StringInfo buffers
that are initialized in WalSndLoop. Problem with that is that they can
be in some rare scenarios used from CreateReplicationSlot (and IMHO
StartLogicalReplication) before WalSndLoop is called which causes
segfault of walsender. This is rare because it only happens when
downstream closes connection during logical decoding initialization.

Since it's not exactly straight forward to find when these need to be
initialized based on commands, I decided to move the initialization code
to exec_replication_command() since that's always called before anything
so that makes it much less error prone (patch 0003).

The 0003 should be backpatched all the way to 9.4 where multiple
commands started using those buffers.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 0c18831557c963bdd4d03ef5d8f3c4ace1a51d0e Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 20 Jan 2017 21:20:56 +0100
Subject: [PATCH 1/3] Use asynchronous connect API in libpqwalreceiver

---
 src/backend/postmaster/pgstat.c                    |  4 +-
 .../libpqwalreceiver/libpqwalreceiver.c            | 58 +++++++++++++++++++++-
 src/include/pgstat.h                               |  2 +-
 3 files changed, 59 insertions(+), 5 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 7176cf1..19ad6b5 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w)
 		case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
 			event_name = "WalReceiverWaitStart";
 			break;
-		case WAIT_EVENT_LIBPQWALRECEIVER_READ:
-			event_name = "LibPQWalReceiverRead";
+		case WAIT_EVENT_LIBPQWALRECEIVER:
+			event_name = "LibPQWalReceiver";
 			break;
 		case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
 			event_name = "WalSenderWaitForWAL";
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7df3698..8dc51d4 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -112,6 +112,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 				 char **err)
 {
 	WalReceiverConn *conn;
+	PostgresPollingStatusType status;
 	const char *keys[5];
 	const char *vals[5];
 	int			i = 0;
@@ -138,7 +139,60 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	vals[i] = NULL;
 
 	conn = palloc0(sizeof(WalReceiverConn));
-	conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
+	conn->streamConn = PQconnectStartParams(keys, vals,
+											/* expand_dbname = */ true);
+	/* Check for conn status. */
+	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
+	{
+		*err = pstrdup(PQerrorMessage(conn->streamConn));
+		return NULL;
+	}
+
+	/* Poll connection. */
+	do
+	{
+		int			rc;
+		int			extra_flag;
+
+		/* Determine which function we should use for Polling. */
+		status = PQconnectPoll(conn->streamConn);
+
+		/* Next action based upon status value. */
+		switch (status)
+		{
+			case PGRES_POLLING_READING:
+				extra_flag = WL_SOCKET_READABLE;
+				/* pass through */
+			case PGRES_POLLING_WRITING:
+				extra_flag = WL_SOCKET_WRITEABLE;
+
+				ResetLatch(&MyProc->procLatch);
+				rc = WaitLatchOrSocket(&MyProc->procLatch,
+									   WL_POSTMASTER_DEATH |
+									   WL_LATCH_SET | extra_flag,
+									   PQsocket(conn->streamConn),
+									   0,
+									   WAIT_EVENT_LIBPQWALRECEIVER);
+				if (rc & WL_POSTMASTER_DEATH)
+					exit(1);
+
+				/* Interrupted. */
+				if (rc & WL_LATCH_SET)
+				{
+					CHECK_FOR_INTERRUPTS();
+					break;
+				}
+				break;
+
+			/* Either can continue polling or finished one way or another. */
+			default:
+				break;
+		}
+
+		/* Loop until we have OK or FAILED status. */
+	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+
+	/* Check the status. */
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 	{
 		*err = pstrdup(PQerrorMessage(conn->streamConn));
@@ -508,7 +562,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 								   WL_LATCH_SET,
 								   PQsocket(streamConn),
 								   0,
-								   WAIT_EVENT_LIBPQWALRECEIVER_READ);
+								   WAIT_EVENT_LIBPQWALRECEIVER);
 			if (rc & WL_POSTMASTER_DEATH)
 				exit(1);
 
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index de8225b..e088474 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -764,7 +764,7 @@ typedef enum
 	WAIT_EVENT_CLIENT_WRITE,
 	WAIT_EVENT_SSL_OPEN_SERVER,
 	WAIT_EVENT_WAL_RECEIVER_WAIT_START,
-	WAIT_EVENT_LIBPQWALRECEIVER_READ,
+	WAIT_EVENT_LIBPQWALRECEIVER,
 	WAIT_EVENT_WAL_SENDER_WAIT_WAL,
 	WAIT_EVENT_WAL_SENDER_WRITE_DATA
 } WaitEventClient;
-- 
2.7.4

From df4d4f9d5fe9651c6c8f4176e515ce8a99cde252 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 20 Jan 2017 22:22:33 +0100
Subject: [PATCH 2/3] Close replication connection when slot creation gets
 canceled

---
 src/backend/commands/subscriptioncmds.c | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 00f2a5f..726c286 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -298,10 +298,20 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
 			ereport(ERROR,
 					(errmsg("could not connect to the publisher: %s", err)));
 
-		walrcv_create_slot(wrconn, slotname, false, &lsn);
-		ereport(NOTICE,
-				(errmsg("created replication slot \"%s\" on publisher",
-						slotname)));
+		PG_TRY();
+		{
+			walrcv_create_slot(wrconn, slotname, false, &lsn);
+			ereport(NOTICE,
+					(errmsg("created replication slot \"%s\" on publisher",
+							slotname)));
+		}
+		PG_CATCH();
+		{
+			/* Close the connection in case of failure. */
+			walrcv_disconnect(wrconn);
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
 
 		/* And we are done with the remote side. */
 		walrcv_disconnect(wrconn);
-- 
2.7.4

From ed076d5ccc03dbf47218b201f8c6c52df51e8027 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 20 Jan 2017 22:22:53 +0100
Subject: [PATCH 3/3] Always initialize stringinfo buffers in walsender

---
 src/backend/replication/walsender.c | 25 +++++++++++++++----------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f3082c3..a6e2655 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -149,6 +149,9 @@ static StringInfoData output_message;
 static StringInfoData reply_message;
 static StringInfoData tmpbuf;
 
+/* Did we init above buffers? */
+static bool initialized_buffers = false;
+
 /*
  * Timestamp of the last receipt of the reply from the standby. Set to 0 if
  * wal_sender_timeout doesn't need to be active.
@@ -816,8 +819,6 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 	}
 
-	initStringInfo(&output_message);
-
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
 	{
 		LogicalDecodingContext *ctx;
@@ -1310,6 +1311,18 @@ exec_replication_command(const char *cmd_string)
 			(errmsg("received replication command: %s", cmd_string)));
 
 	/*
+	 * Allocate buffers that will be used for each outgoing and incoming
+	 * message.  Only do this once to avoid leaking memory.
+	 */
+	if (!initialized_buffers)
+	{
+		initStringInfo(&output_message);
+		initStringInfo(&reply_message);
+		initStringInfo(&tmpbuf);
+		initialized_buffers = true;
+	}
+
+	/*
 	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
 	 * command arrives. Clean up the old stuff if there's anything.
 	 */
@@ -1802,14 +1815,6 @@ static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
 	/*
-	 * Allocate buffers that will be used for each outgoing and incoming
-	 * message.  We do this just once to reduce palloc overhead.
-	 */
-	initStringInfo(&output_message);
-	initStringInfo(&reply_message);
-	initStringInfo(&tmpbuf);
-
-	/*
 	 * Initialize the last reply timestamp. That enables timeout processing
 	 * from hereon.
 	 */
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to