At Thu, 25 May 2017 17:52:50 +0200, Petr Jelinek <petr.jeli...@2ndquadrant.com> 
wrote in <e082a56a-fd95-a250-3bae-0fff93832...@2ndquadrant.com>
> Hi,
> 
> We have had issue with walsender timeout when used with logical decoding
> and the transaction is taking long time to be decoded (because it
> contains many changes)
> 
> I was looking today at the walsender code and realized that it's because
> if the network and downstream are fast enough, we'll always take fast
> path in WalSndWriteData which does not do reply or keepalive processing
> and is only reached once the transaction has finished by other code. So
> paradoxically we die of timeout because everything was fast enough to
> never fall back to slow code path.
> 
> I propose we only use fast path if the last processed reply is not older
> than half of walsender timeout, if it is then we'll force the slow code
> path to process the replies again. This is similar logic that we use to
> determine if to send keepalive message. I also added CHECK_INTERRUPRS
> call to fast code path because otherwise walsender might ignore them for
> too long on large transactions.
> 
> Thoughts?

+       TimestampTz     now = GetCurrentTimestamp();

I think it is not recommended to read the current time too
frequently, especially within a loop that hates slowness. (I
suppose that a loop that can fill up a send queue falls into that
category.)  If you don't mind a certain amount of additional
complexity for eliminating the possible slowdown by the check,
timeout would be usable. Attached patch does almost the same
thing with your patch but without busy time check.

What do you think about this?

# I saw that SIGQUIT doens't work for active publisher, which I
# think mention in another thread.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 49cce38..ec33357 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -194,6 +194,15 @@ static volatile sig_atomic_t replication_active = false;
 static LogicalDecodingContext *logical_decoding_ctx = NULL;
 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
+/*
+ * For logical replication, WalSndWriteData needs to process replies from the
+ * client to check if keepalive to be sent.  WAL send loop may skip check
+ * replies only while check_replies_needed = false.
+ */
+#define REPLY_SEND_TIMEOUT USER_TIMEOUT
+static bool	keepalive_timeout_initialized = false;
+static bool	check_replies_needed = false;
+
 /* A sample associating a WAL location with the time it was written. */
 typedef struct
 {
@@ -1175,12 +1184,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
 	/* Try to flush pending output to the client */
 	if (pq_flush_if_writable() != 0)
 		WalSndShutdown();
 
-	if (!pq_is_send_pending())
+ 	/* fast path: return immediately if possible */
+	if (!check_replies_needed && !pq_is_send_pending())
 		return;
 
 	for (;;)
@@ -1216,10 +1225,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
 
-		/* If we finished clearing the buffered data, we're done here. */
-		if (!pq_is_send_pending())
-			break;
-
 		now = GetCurrentTimestamp();
 
 		/* die if timeout was reached */
@@ -1228,6 +1233,10 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 		/* Send keepalive if the time has come */
 		WalSndKeepaliveIfNecessary(now);
 
+		/* If we finished clearing the buffered data, we're done here. */
+		if (!pq_is_send_pending())
+			break;
+
 		sleeptime = WalSndComputeSleeptime(now);
 
 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
@@ -1562,6 +1571,13 @@ exec_replication_command(const char *cmd_string)
 	return true;
 }
 
+static void
+LogicalDecodeReplyTimeoutHandler(void)
+{
+	check_replies_needed = true;
+}
+
+
 /*
  * Process any incoming messages while streaming. Also checks if the remote
  * end has closed the connection.
@@ -1662,6 +1678,22 @@ ProcessRepliesIfAny(void)
 	{
 		last_reply_timestamp = GetCurrentTimestamp();
 		waiting_for_ping_response = false;
+
+		if (wal_sender_timeout > 0)
+		{
+			if (!keepalive_timeout_initialized)
+			{
+				RegisterTimeout(REPLY_SEND_TIMEOUT,
+								LogicalDecodeReplyTimeoutHandler);
+				keepalive_timeout_initialized = true;
+			}
+
+			check_replies_needed = false;
+			enable_timeout_at(REPLY_SEND_TIMEOUT,
+						  TimestampTzPlusMilliseconds(last_reply_timestamp,
+													  wal_sender_timeout / 2));
+		}
+		
 	}
 }
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to