On 2017-09-12 11:28, Kyotaro HORIGUCHI wrote:
Hello,

At Wed, 06 Sep 2017 13:46:16 +0000, Yura Sokolov
<funny.fal...@postgrespro.ru> wrote in
<20170906134616.18925.88390.p...@coridan.postgresql.org>
I've changed to "need review" to gain more attention from other.

I understand that the problem here is too fast network prohibits
walsender from sending replies.

In physical replication, WAL records are sent as soon as written
and the timeout is handled in the topmost loop in WalSndLoop. In
logical replication, data is sent at once at commit time in most
cases. So it can take a long time in ReorderBufferCommit without
returning to WalSndLoop (or even XLogSendLogical).

One problem here is that WalSndWriteData waits for the arrival of
the next *WAL record* in the slow-ptah because it is called by
cues of ReorderBuffer* functions (mainly *Commit) irrelevantly to
WAL insertion. This is I think the root cause of this problem.

On the other hand, it ought to take a sleep when network is
stalled, in other words, data to send remains after a flush. We
don't have a means to signal when the socket queue gets a new
room for another bytes. However, I suppose that such slow network
allows us to sleep several or several tens of milliseconds. Or,
if we could know how many bytes ps_flush_if_writable() pushed,
it's enough to wait only when the function returns pushing
nothing.

As the result, I think that the functions should be modified as
the following.

- Forcing slow-path if time elapses a half of a ping period is
  right. (GetCurrentTimestamp is anyway requried.)

- The slow-path should not sleep waiting Latch. It should just
  pg_usleep() for maybe 1-10ms.

- We should go to the fast path just after keepalive or response
  message has been sent. In other words, the "if (now <" block
  should be in the "for (;;)" loop. This avoids needless runs on
  the slow-path.


It would be refactorable as the following.

  prepare for the send buffer;

  for (;;)
  {
    now = GetCurrentTimeStamp();
    if (now < )...
    {
      fast-path
    }
    else
    {
      slow-path
    }
    return if finished
    sleep for 1ms?
  }


What do you think about this?

regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center

Good day, Petr, Kyotaro

I've created failing test for issue (0001-Add-failing-test...) .
It tests insertion of 20000 rows with 10ms wal_sender_timeout
(it fails in WalSndWriteData on master) and then deletion of
those rows with 1ms wal_sender_timeout (it fails in WalSndLoop).

Both Peter's patch and my simplified suggestion didn't pass the
test. I didn't checked Kyotaro's suggestion, though, cause I
didn't understand it well.

I've made patch that passes the test (0002-Fix-walsender...) .
(I've used Petr's commit message. Don't you mind, Petr?)

In WalSndWriteData it adds CHECK_FOR_INTERRUPTS to fastpath and
falls through to slow path after half of wal_sender_timeout as
were discussed.
In a slow path, it just skips fast exit on `!pq_is_send_pending()`
and check for timeout for the first loop iteration. And it sets
sleeptime to 1ms even if timeout were reached. It gives a chance
to receiver's response to arrive.

In WalSndLoop it also skips check for timeout first iteration after
send_data were called, and also sleeps at least 1 ms.

I'm not sure about correctness of my patch. Given test exists,
you may suggest better solutions, or improve this solution.

I'll set commitfest topic status to 'Need review' assuming
my patch could be reviewed.

--
Sokolov Yura aka funny_falcon
Postgres Professional: https://postgrespro.ru
The Russian Postgres Company
From c4f7ffb300200d51647a283872a2326350c3f5b9 Mon Sep 17 00:00:00 2001
From: Sokolov Yura <funny.fal...@postgrespro.ru>
Date: Tue, 26 Sep 2017 19:28:57 +0300
Subject: [PATCH 1/2] Add failing test: wal_sender_timeout+logical decoding of
 big transaction.

Test case for timeout during sending huge transaction through logical
replication.
https://www.postgresql.org/message-id/flat/e082a56a-fd95-a250-3bae-0fff93832...@2ndquadrant.com#e082a56a-fd95-a250-3bae-0fff93832...@2ndquadrant.com
https://commitfest.postgresql.org/14/1151/
---
 src/test/subscription/t/008_sync_timeout.pl | 93 +++++++++++++++++++++++++++++
 1 file changed, 93 insertions(+)
 create mode 100644 src/test/subscription/t/008_sync_timeout.pl

diff --git a/src/test/subscription/t/008_sync_timeout.pl b/src/test/subscription/t/008_sync_timeout.pl
new file mode 100644
index 0000000000..ef5363d7e4
--- /dev/null
+++ b/src/test/subscription/t/008_sync_timeout.pl
@@ -0,0 +1,93 @@
+# Tests for logical replication table syncing
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	"wal_sender_timeout = 10ms");
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	"wal_retrieve_retry_interval = 1ms");
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rep (a int primary key)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rep SELECT generate_series(1,10)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rep (a int primary key)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR ALL TABLES");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
+);
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(10), 'initial data synced for first sub');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rep SELECT generate_series(11,20000)");
+
+# wait for sync to finish this time
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# check that all data is synced
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(20000), 'initial data synced for second sub');
+
+# stricter timeout
+$node_publisher->append_conf('postgresql.conf',
+	"wal_sender_timeout = 1ms");
+$node_publisher->reload;
+
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
+
+# wait for sync to finish this time
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# check that all data is synced
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(0), 'initial data synced for second sub');
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.11.0

From 1dfe8b9f2612e2e917d8c0d20d9631ad62439f36 Mon Sep 17 00:00:00 2001
From: Sokolov Yura <funny.fal...@postgrespro.ru>
Date: Wed, 27 Sep 2017 12:53:56 +0300
Subject: [PATCH 2/2] Fix walsender timeouts when decoding large transaction

The logical slots have fast code path for sending data in order to not
impose too high per message overhead. The fast path skips checks for
interrupts and timeouts. However the fast path failed to consider the
fact that transaction with large number of changes may take very long to
be processed and sent to the client. This causes walsender to ignore
interrupts for potentially long time but more importantly it will cause
walsender being killed due to timeout at the end of such transaction.

This commit changes the fast path to also check for interrupts and only
allows calling the fast path when last keeplaive check happened less
than half of walsender timeout ago, otherwise the slower code path will
be taken.

Discussion:
https://www.postgresql.org/message-id/flat/e082a56a-fd95-a250-3bae-0fff93832...@2ndquadrant.com
---
 src/backend/replication/walsender.c | 80 ++++++++++++++++++++++++++++++-------
 1 file changed, 65 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 6ec4e63161..67ddfffea5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1151,6 +1151,9 @@ static void
 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 				bool last_write)
 {
+	bool		firsttime = true;
+	TimestampTz now = GetCurrentTimestamp();
+
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
 
@@ -1160,7 +1163,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	 * several releases by streaming physical replication.
 	 */
 	resetStringInfo(&tmpbuf);
-	pq_sendint64(&tmpbuf, GetCurrentTimestamp());
+	pq_sendint64(&tmpbuf, now);
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
@@ -1169,14 +1172,29 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	if (pq_flush_if_writable() != 0)
 		WalSndShutdown();
 
-	if (!pq_is_send_pending())
+	/*
+	 * If transaction is too big, then we can spend a lot of time sending it.
+	 * We should fall to slow path to check reciever's replies to update
+	 * last_reply_timestamp.
+	 */
+	if (!pq_is_send_pending() &&
+		(last_reply_timestamp <= 0 || wal_sender_timeout <= 0 ||
+		 now <= TimestampTzPlusMilliseconds(last_reply_timestamp,
+											wal_sender_timeout / 2)))
+	{
+		/*
+		 * Always check for interrupts to be able to exit from sending huge
+		 * transaction.
+		 */
+		CHECK_FOR_INTERRUPTS();
+
 		return;
+	}
 
 	for (;;)
 	{
 		int			wakeEvents;
 		long		sleeptime;
-		TimestampTz now;
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -1205,19 +1223,34 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
 
-		/* If we finished clearing the buffered data, we're done here. */
-		if (!pq_is_send_pending())
-			break;
-
 		now = GetCurrentTimestamp();
 
-		/* die if timeout was reached */
-		WalSndCheckTimeOut(now);
+		/*
+		 * Always sleep first time we enter this loop. This will give a chance
+		 * to catch wal reciever's response in ProcessRepliesIfAny() above.
+		 * This fixes outstanding timeouts on very fast networks (localhost,
+		 * for example). See discussion at:
+		 * https://www.postgresql.org/message-id/flat/e082a56a-fd95-a250-3bae-0fff93832...@2ndquadrant.com
+		 */
+		if (!firsttime || (last_reply_timestamp <= 0 && wal_sender_timeout <= 0))
+		{
+			/* If we finished clearing the buffered data, we're done here. */
+			if (!pq_is_send_pending())
+				break;
 
-		/* Send keepalive if the time has come */
-		WalSndKeepaliveIfNecessary(now);
+			/* die if timeout was reached */
+			WalSndCheckTimeOut(now);
+
+			/* Send keepalive if the time has come */
+			WalSndKeepaliveIfNecessary(now);
+		}
+
+		firsttime = false;
 
 		sleeptime = WalSndComputeSleeptime(now);
+		/* sleep at least 1 ms even if we already timed out */
+		if (sleeptime == 0)
+			sleeptime = 1;
 
 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
 			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
@@ -2074,6 +2107,8 @@ WalSndCheckTimeOut(TimestampTz now)
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
+	bool		firsttime = true;
+
 	/*
 	 * Initialize the last reply timestamp. That enables timeout processing
 	 * from hereon.
@@ -2131,7 +2166,10 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * caught up.
 		 */
 		if (!pq_is_send_pending())
+		{
 			send_data();
+			firsttime = true;
+		}
 		else
 			WalSndCaughtUp = false;
 
@@ -2171,11 +2209,20 @@ WalSndLoop(WalSndSendDataCallback send_data)
 
 		now = GetCurrentTimestamp();
 
-		/* Check for replication timeout. */
-		WalSndCheckTimeOut(now);
+		/*
+		 * Skip timeout check first time after sending something. See comment
+		 * in WalSndWriteData.
+		 */
+		if (!firsttime)
+		{
+			/* Check for replication timeout. */
+			WalSndCheckTimeOut(now);
 
-		/* Send keepalive if the time has come */
-		WalSndKeepaliveIfNecessary(now);
+			/* Send keepalive if the time has come */
+			WalSndKeepaliveIfNecessary(now);
+		}
+
+		firsttime = false;
 
 		/*
 		 * We don't block if not caught up, unless there is unsent data
@@ -2194,6 +2241,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
 				WL_SOCKET_READABLE;
 
 			sleeptime = WalSndComputeSleeptime(now);
+			/* sleep at least 1 ms even if we already timed out */
+			if (sleeptime == 0)
+				sleeptime = 1;
 
 			if (pq_is_send_pending())
 				wakeEvents |= WL_SOCKET_WRITEABLE;
-- 
2.11.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to