On Thu, Jun 26, 2014 at 7:01 PM,  <furu...@pm.nttdata.co.jp> wrote:
>> The patch looks somewhat complicated and bugs can be easily introduced
>> because it tries to not only add new feature but also reorganize the main
>> loop in HandleCopyStream at the same time. To keep the patch simple, I'm
>> thinking to firstly apply the attached patch which just refactors the
>> main loop. Then we can apply the main patch, i.e., add new feature.
>> Thought?
> Thank you for the refactoring patch.
> I did a review of the patch.
> As a result, I found the calculation of sleeptime when the --status-intarvall 
> is set to 1 was incorrect.
>
> --status-intarvall 1 -->  sleeptime 1.9999 !?
> --status-intarvall 2 -->  sleeptime 1.9999 OK
> --status-intarvall 3 -->  sleeptime 2.9999 OK

Thanks for the review!

+            if (secs <= 0)
+                secs = 1;    /* Always sleep at least 1 sec */
+
+            sleeptime = secs * 1000 + usecs / 1000;

The above is the code which caused that problem. 'usecs' should have been
reset to zero when 'secs' are rounded up to 1 second. But not. Attached is
the updated version of the patch.

Regards,

-- 
Fujii Masao
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d76e605..4aa35da 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
 				 uint32 timeline, char *basedir,
 			   stream_stop_callback stream_stop, int standby_message_timeout,
 				 char *partial_suffix, XLogRecPtr *stoppos);
+static int CopyStreamPoll(PGconn *conn, long timeout_ms);
+static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		int			bytes_written;
 		int64		now;
 		int			hdr_len;
-
-		if (copybuf != NULL)
-		{
-			PQfreemem(copybuf);
-			copybuf = NULL;
-		}
+		long		sleeptime;
 
 		/*
 		 * Check if we should continue streaming, or abort at this point.
@@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			last_status = now;
 		}
 
-		r = PQgetCopyData(conn, &copybuf, 1);
-		if (r == 0)
+		/*
+		 * Compute how long send/receive loops should sleep
+		 */
+		if (standby_message_timeout && still_sending)
 		{
-			/*
-			 * No data available. Wait for some to appear, but not longer than
-			 * the specified timeout, so that we can ping the server.
-			 */
-			fd_set		input_mask;
-			struct timeval timeout;
-			struct timeval *timeoutptr;
-
-			FD_ZERO(&input_mask);
-			FD_SET(PQsocket(conn), &input_mask);
-			if (standby_message_timeout && still_sending)
+			int64		targettime;
+			long		secs;
+			int			usecs;
+
+			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
+			feTimestampDifference(now,
+								  targettime,
+								  &secs,
+								  &usecs);
+			/* Always sleep at least 1 sec */
+			if (secs <= 0)
 			{
-				int64		targettime;
-				long		secs;
-				int			usecs;
-
-				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-				feTimestampDifference(now,
-									  targettime,
-									  &secs,
-									  &usecs);
-				if (secs <= 0)
-					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-				else
-					timeout.tv_sec = secs;
-				timeout.tv_usec = usecs;
-				timeoutptr = &timeout;
+				secs = 1;
+				usecs = 0;
 			}
-			else
-				timeoutptr = NULL;
 
-			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-			if (r == 0 || (r < 0 && errno == EINTR))
-			{
-				/*
-				 * Got a timeout or signal. Continue the loop and either
-				 * deliver a status packet to the server or just go back into
-				 * blocking.
-				 */
-				continue;
-			}
-			else if (r < 0)
-			{
-				fprintf(stderr, _("%s: select() failed: %s\n"),
-						progname, strerror(errno));
-				goto error;
-			}
-			/* Else there is actually data on the socket */
-			if (PQconsumeInput(conn) == 0)
-			{
-				fprintf(stderr,
-						_("%s: could not receive data from WAL stream: %s"),
-						progname, PQerrorMessage(conn));
-				goto error;
-			}
-			continue;
+			sleeptime = secs * 1000 + usecs / 1000;
 		}
+		else
+			sleeptime = -1;
+
+		r = CopyStreamReceive(conn, sleeptime, &copybuf);
+		if (r == 0)
+			continue;
 		if (r == -1)
+			goto error;
+		if (r == -2)
 		{
 			PGresult   *res = PQgetResult(conn);
 
@@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			}
 			if (copybuf != NULL)
 				PQfreemem(copybuf);
+			copybuf = NULL;
 			*stoppos = blockpos;
 			return res;
 		}
-		if (r == -2)
-		{
-			fprintf(stderr, _("%s: could not read COPY data: %s"),
-					progname, PQerrorMessage(conn));
-			goto error;
-		}
 
 		/* Check the message type. */
 		if (copybuf[0] == 'k')
@@ -1056,3 +1019,115 @@ error:
 		PQfreemem(copybuf);
 	return NULL;
 }
+
+/*
+ * Wait until we can read CopyData message, or timeout.
+ *
+ * Returns 1 if data has become available for reading, 0 if timed out
+ * or interrupted by signal, and -1 on an error.
+ */
+static int
+CopyStreamPoll(PGconn *conn, long timeout_ms)
+{
+	int			ret;
+	fd_set		input_mask;
+	struct timeval timeout;
+	struct timeval *timeoutptr;
+
+	if (PQsocket(conn) < 0)
+	{
+		fprintf(stderr, _("%s: socket not open"), progname);
+		return -1;
+	}
+
+	FD_ZERO(&input_mask);
+	FD_SET(PQsocket(conn), &input_mask);
+
+	if (timeout_ms < 0)
+		timeoutptr = NULL;
+	else
+	{
+		timeout.tv_sec = timeout_ms / 1000L;
+		timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
+		timeoutptr = &timeout;
+	}
+
+	ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+	if (ret == 0 || (ret < 0 && errno == EINTR))
+		return 0;		/* Got a timeout or signal */
+	else if (ret < 0)
+	{
+		fprintf(stderr, _("%s: select() failed: %s\n"),
+				progname, strerror(errno));
+		return -1;
+	}
+
+	return 1;
+}
+
+/*
+ * Receive CopyData message available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * If data was received, returns the length of the data. *buffer is set to
+ * point to a buffer holding the received message. The buffer is only valid
+ * until the next CopyStreamReceive call.
+ *
+ * 0 if no data was available within timeout, or wait was interrupted
+ * by signal. -1 on error. -2 if the server ended the COPY.
+ */
+static int
+CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+{
+	static char	   *copybuf = NULL;
+	int			rawlen;
+
+	if (copybuf != NULL)
+		PQfreemem(copybuf);
+	copybuf = NULL;
+	*buffer = NULL;
+
+	/* Try to receive a CopyData message */
+	rawlen = PQgetCopyData(conn, &copybuf, 1);
+	if (rawlen == 0)
+	{
+		/*
+		 * No data available. Wait for some to appear, but not longer than
+		 * the specified timeout, so that we can ping the server.
+		 */
+		if (timeout > 0)
+		{
+			int		ret;
+
+			ret = CopyStreamPoll(conn, timeout);
+			if (ret <= 0)
+				return ret;
+		}
+
+		/* Else there is actually data on the socket */
+		if (PQconsumeInput(conn) == 0)
+		{
+			fprintf(stderr,
+					_("%s: could not receive data from WAL stream: %s"),
+					progname, PQerrorMessage(conn));
+			return -1;
+		}
+
+		/* Now that we've consumed some input, try again */
+		rawlen = PQgetCopyData(conn, &copybuf, 1);
+		if (rawlen == 0)
+			return 0;
+	}
+	if (rawlen == -1)			/* end-of-streaming or error */
+		return -2;
+	if (rawlen == -2)
+	{
+		fprintf(stderr, _("%s: could not read COPY data: %s"),
+				progname, PQerrorMessage(conn));
+		return -1;
+	}
+
+	/* Return received messages to caller */
+	*buffer = copybuf;
+	return rawlen;
+}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to