On 24.03.2011 15:24, Fujii Masao wrote:
On Wed, Mar 23, 2011 at 7:33 PM, Heikki Linnakangas
<heikki.linnakan...@enterprisedb.com> wrote:
I don't much like the API for this. Walsender shouldn't need to know about
the details of the FE/BE protocol, pq_putbytes_if_available() seems too low
level to be useful.
I think a better API would be to have a non-blocking version of
pq_putmessage(). We can make the output buffer in pqcomm.c resizeable, so
that when the message doesn't fit in the output buffer in pq_putmessage(),
the buffer is enlarged instead of trying to flush it.
Attached is a patch using that approach. This is a much smaller patch, and
easier to understand.
Agreed. Thanks for improving the patch.
pq_flush_if_writable() calls internal_flush() without using PG_TRY block.
This seems unsafe because for example pgwin32_waitforsinglesocket()
called by secure_write() can throw ERROR.
Perhaps it's time to give up on the assumption that the socket is in
blocking mode except within those two functions. Attached patch adds the
pq_set_nonblocking() function from your patch, and adds calls to it
before all secure_read/write operations to put the socket in the right
mode. There's only a few of those operations.
Should we use COMMERROR instead of ERROR if we fail to put the socket in
the right mode?
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e0ebee6..3192ef7 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2019,6 +2019,28 @@ SET ENABLE_SEQSCAN TO OFF;
</para>
</listitem>
</varlistentry>
+
+ <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
+ <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
+ <indexterm>
+ <primary><varname>replication_timeout</> configuration parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Specifies the maximum time, in milliseconds, to wait for the reply
+ from the standby before terminating replication. This is useful for
+ the primary server to detect the standby crash or network outage.
+ A value of zero turns this off. This parameter can only be set in
+ the <filename>postgresql.conf</> file or on the server command line.
+ The default value is 60 seconds.
+ </para>
+ <para>
+ To make the timeout work properly, <xref linkend="guc-wal-receiver-status-interval">
+ must be enabled on the standby, and its value must be less than the
+ value of <varname>replication_timeout</>.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</sect2>
@@ -2216,6 +2238,11 @@ SET ENABLE_SEQSCAN TO OFF;
the <filename>postgresql.conf</> file or on the server command line.
The default value is 10 seconds.
</para>
+ <para>
+ When <xref linkend="guc-replication-timeout"> is enabled on the primary,
+ <varname>wal_receiver_status_interval</> must be enabled, and its value
+ must be less than the value of <varname>replication_timeout</>.
+ </para>
</listitem>
</varlistentry>
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 3c7b05b..db313a8 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -55,10 +55,12 @@
* pq_peekbyte - peek at next byte from connection
* pq_putbytes - send bytes to connection (not flushed until pq_flush)
* pq_flush - flush pending output
+ * pq_flush_if_writable - flush pending output if writable without blocking
* pq_getbyte_if_available - get a byte if available without blocking
*
* message-level I/O (and old-style-COPY-OUT cruft):
* pq_putmessage - send a normal message (suppressed in COPY OUT mode)
+ * pq_putmessage_noblock - buffer a normal message without blocking (suppressed in COPY OUT mode)
* pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
* pq_endcopyout - end a COPY OUT transfer
*
@@ -92,6 +94,7 @@
#include "miscadmin.h"
#include "storage/ipc.h"
#include "utils/guc.h"
+#include "utils/memutils.h"
/*
* Configuration options
@@ -105,15 +108,21 @@ static char sock_path[MAXPGPATH];
/*
- * Buffers for low-level I/O
+ * Buffers for low-level I/O.
+ *
+ * The receive buffer is fixed size. Send buffer is usually 8k, but can be
+ * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
*/
-#define PQ_BUFFER_SIZE 8192
+#define PQ_SEND_BUFFER_SIZE 8192
+#define PQ_RECV_BUFFER_SIZE 8192
-static char PqSendBuffer[PQ_BUFFER_SIZE];
+static char *PqSendBuffer;
+static int PqSendBufferSize; /* Size send buffer */
static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
+static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
-static char PqRecvBuffer[PQ_BUFFER_SIZE];
+static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength; /* End of data available in PqRecvBuffer */
@@ -128,6 +137,7 @@ static bool DoingCopyOut;
static void pq_close(int code, Datum arg);
static int internal_putbytes(const char *s, size_t len);
static int internal_flush(void);
+static void pq_set_nonblocking(bool nonblocking);
#ifdef HAVE_UNIX_SOCKETS
static int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
@@ -142,7 +152,9 @@ static int Setup_AF_UNIX(void);
void
pq_init(void)
{
- PqSendPointer = PqRecvPointer = PqRecvLength = 0;
+ PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
+ PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+ PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false;
DoingCopyOut = false;
on_proc_exit(pq_close, 0);
@@ -194,6 +206,7 @@ pq_close(int code, Datum arg)
#endif /* ENABLE_GSS || ENABLE_SSPI */
/* Cleanly shut down SSL layer */
+ pq_set_nonblocking(false); /* XXX: Is this required? */
secure_close(MyProcPort);
/*
@@ -732,6 +745,37 @@ TouchSocketFile(void)
* --------------------------------
*/
+/* --------------------------------
+ * pq_set_nonblocking - set socket blocking/non-blocking
+ *
+ * Sets the socket non-blocking if nonblocking is TRUE, or sets it
+ * blocking otherwise.
+ * --------------------------------
+ */
+static void
+pq_set_nonblocking(bool nonblocking)
+{
+ if (MyProcPort->noblock == nonblocking)
+ return;
+
+#ifdef WIN32
+ pgwin32_noblock = nonblocking ? 1 : 0;
+#else
+ if (nonblocking)
+ {
+ if (!pg_set_noblock(MyProcPort->sock))
+ ereport(ERROR,
+ (errmsg("could not set socket to non-blocking mode: %m")));
+ }
+ else
+ {
+ if (!pg_set_block(MyProcPort->sock))
+ ereport(ERROR,
+ (errmsg("could not set socket to blocking mode: %m")));
+ }
+#endif
+ MyProcPort->noblock = nonblocking;
+}
/* --------------------------------
* pq_recvbuf - load some bytes into the input buffer
@@ -756,13 +800,15 @@ pq_recvbuf(void)
PqRecvLength = PqRecvPointer = 0;
}
+ pq_set_nonblocking(false);
+
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
{
int r;
r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
- PQ_BUFFER_SIZE - PqRecvLength);
+ PQ_RECV_BUFFER_SIZE - PqRecvLength);
if (r < 0)
{
@@ -825,7 +871,6 @@ pq_peekbyte(void)
return (unsigned char) PqRecvBuffer[PqRecvPointer];
}
-
/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection,
* if available
@@ -845,72 +890,38 @@ pq_getbyte_if_available(unsigned char *c)
return 1;
}
- /* Temporarily put the socket into non-blocking mode */
-#ifdef WIN32
- pgwin32_noblock = 1;
-#else
- if (!pg_set_noblock(MyProcPort->sock))
- ereport(ERROR,
- (errmsg("could not set socket to non-blocking mode: %m")));
-#endif
- MyProcPort->noblock = true;
- PG_TRY();
+ /* Put the socket into non-blocking mode */
+ pq_set_nonblocking(true);
+
+ r = secure_read(MyProcPort, c, 1);
+ if (r < 0)
{
- r = secure_read(MyProcPort, c, 1);
- if (r < 0)
+ /*
+ * Ok if no data available without blocking or interrupted (though
+ * EINTR really shouldn't happen with a non-blocking socket).
+ * Report other errors.
+ */
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+ r = 0;
+ else
{
/*
- * Ok if no data available without blocking or interrupted (though
- * EINTR really shouldn't happen with a non-blocking socket).
- * Report other errors.
+ * Careful: an ereport() that tries to write to the client
+ * would cause recursion to here, leading to stack overflow
+ * and core dump! This message must go *only* to the
+ * postmaster log.
*/
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
- r = 0;
- else
- {
- /*
- * Careful: an ereport() that tries to write to the client
- * would cause recursion to here, leading to stack overflow
- * and core dump! This message must go *only* to the
- * postmaster log.
- */
- ereport(COMMERROR,
- (errcode_for_socket_access(),
- errmsg("could not receive data from client: %m")));
- r = EOF;
- }
- }
- else if (r == 0)
- {
- /* EOF detected */
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("could not receive data from client: %m")));
r = EOF;
}
}
- PG_CATCH();
+ else if (r == 0)
{
- /*
- * The rest of the backend code assumes the socket is in blocking
- * mode, so treat failure as FATAL.
- */
-#ifdef WIN32
- pgwin32_noblock = 0;
-#else
- if (!pg_set_block(MyProcPort->sock))
- ereport(FATAL,
- (errmsg("could not set socket to blocking mode: %m")));
-#endif
- MyProcPort->noblock = false;
- PG_RE_THROW();
+ /* EOF detected */
+ r = EOF;
}
- PG_END_TRY();
-#ifdef WIN32
- pgwin32_noblock = 0;
-#else
- if (!pg_set_block(MyProcPort->sock))
- ereport(FATAL,
- (errmsg("could not set socket to blocking mode: %m")));
-#endif
- MyProcPort->noblock = false;
return r;
}
@@ -1138,10 +1149,13 @@ internal_putbytes(const char *s, size_t len)
while (len > 0)
{
/* If buffer is full, then flush it out */
- if (PqSendPointer >= PQ_BUFFER_SIZE)
+ if (PqSendPointer >= PqSendBufferSize)
+ {
+ pq_set_nonblocking(false);
if (internal_flush())
return EOF;
- amount = PQ_BUFFER_SIZE - PqSendPointer;
+ }
+ amount = PqSendBufferSize - PqSendPointer;
if (amount > len)
amount = len;
memcpy(PqSendBuffer + PqSendPointer, s, amount);
@@ -1167,17 +1181,25 @@ pq_flush(void)
if (PqCommBusy)
return 0;
PqCommBusy = true;
+ pq_set_nonblocking(false);
res = internal_flush();
PqCommBusy = false;
return res;
}
+/* --------------------------------
+ * internal_flush - flush pending output
+ *
+ * Returns 0 if OK (meaning everything was sent, or operation would block
+ * and the socket is in non-blocking mode), or EOF if trouble.
+ * --------------------------------
+ */
static int
internal_flush(void)
{
static int last_reported_send_errno = 0;
- char *bufptr = PqSendBuffer;
+ char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
while (bufptr < bufend)
@@ -1192,6 +1214,16 @@ internal_flush(void)
continue; /* Ok if we were interrupted */
/*
+ * Ok if no data writable without blocking, and the socket
+ * is in non-blocking mode.
+ */
+ if (errno == EAGAIN ||
+ errno == EWOULDBLOCK)
+ {
+ return 0;
+ }
+
+ /*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
* dump! This message must go *only* to the postmaster log.
@@ -1212,18 +1244,56 @@ internal_flush(void)
* We drop the buffered data anyway so that processing can
* continue, even though we'll probably quit soon.
*/
- PqSendPointer = 0;
+ PqSendStart = PqSendPointer = 0;
return EOF;
}
last_reported_send_errno = 0; /* reset after any successful send */
bufptr += r;
+ PqSendStart += r;
}
- PqSendPointer = 0;
+ PqSendStart = PqSendPointer = 0;
return 0;
}
+/* --------------------------------
+ * pq_flush_if_writable - flush pending output if writable without blocking
+ *
+ * Returns 0 if OK, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_flush_if_writable(void)
+{
+ int res;
+
+ /* Quick exit if nothing to do */
+ if (PqSendPointer == PqSendStart)
+ return 0;
+
+ /* No-op if reentrant call */
+ if (PqCommBusy)
+ return 0;
+
+ /* Temporarily put the socket into non-blocking mode */
+ pq_set_nonblocking(true);
+
+ PqCommBusy = true;
+ res = internal_flush();
+ PqCommBusy = false;
+ return res;
+}
+
+/* --------------------------------
+ * pq_is_send_pending - is there any pending data in the output buffer?
+ * --------------------------------
+ */
+bool
+pq_is_send_pending(void)
+{
+ return (PqSendStart < PqSendPointer);
+}
/* --------------------------------
* Message-level I/O routines begin here.
@@ -1286,6 +1356,25 @@ fail:
}
/* --------------------------------
+ * pq_putmessage_noblock - like pq_putmessage, but never blocks
+ *
+ * If the output buffer is too small to hold the message, the buffer
+ * is enlarged.
+ */
+int
+pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+ int required = PqSendPointer + len + 5 ;
+ if (required > PqSendBufferSize)
+ {
+ PqSendBuffer = repalloc(PqSendBuffer, required);
+ PqSendBufferSize = required;
+ }
+ return pq_putmessage(msgtype, s, len);
+}
+
+
+/* --------------------------------
* pq_startcopyout - inform libpq that an old-style COPY OUT transfer
* is beginning
* --------------------------------
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index a4f559e..32d0cb5 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -193,19 +193,21 @@ DisownLatch(volatile Latch *latch)
bool
WaitLatch(volatile Latch *latch, long timeout)
{
- return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+ return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
}
/*
* Like WaitLatch, but will also return when there's data available in
- * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
- * was set, or 2 if the scoket became readable.
+ * 'sock' for reading or writing. Returns 0 if timeout was reached,
+ * 1 if the latch was set, 2 if the socket became readable or writable.
*/
int
-WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
+ bool forWrite, long timeout)
{
struct timeval tv, *tvp = NULL;
fd_set input_mask;
+ fd_set output_mask;
int rc;
int result = 0;
@@ -241,14 +243,22 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
FD_ZERO(&input_mask);
FD_SET(selfpipe_readfd, &input_mask);
hifd = selfpipe_readfd;
- if (sock != PGINVALID_SOCKET)
+ if (sock != PGINVALID_SOCKET && forRead)
{
FD_SET(sock, &input_mask);
if (sock > hifd)
hifd = sock;
}
- rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
+ FD_ZERO(&output_mask);
+ if (sock != PGINVALID_SOCKET && forWrite)
+ {
+ FD_SET(sock, &output_mask);
+ if (sock > hifd)
+ hifd = sock;
+ }
+
+ rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
if (rc < 0)
{
if (errno == EINTR)
@@ -263,7 +273,9 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
result = 0;
break;
}
- if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
+ if (sock != PGINVALID_SOCKET &&
+ ((forRead && FD_ISSET(sock, &input_mask)) ||
+ (forWrite && FD_ISSET(sock, &output_mask))))
{
result = 2;
break; /* data available in socket */
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index 76dd6be..dbbd4a3 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -14,7 +14,8 @@
#include "postgres.h"
/*
- * Indicate if pgwin32_recv() should operate in non-blocking mode.
+ * Indicate if pgwin32_recv() and pgwin32_send() should operate
+ * in non-blocking mode.
*
* Since the socket emulation layer always sets the actual socket to
* non-blocking mode in order to be able to deliver signals, we must
@@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags)
return -1;
}
+ if (pgwin32_noblock)
+ {
+ /*
+ * No data sent, and we are in "emulated non-blocking mode", so
+ * return indicating that we'd block if we were to continue.
+ */
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
index ac20c49..f42cfef 100644
--- a/src/backend/port/win32_latch.c
+++ b/src/backend/port/win32_latch.c
@@ -85,11 +85,12 @@ DisownLatch(volatile Latch *latch)
bool
WaitLatch(volatile Latch *latch, long timeout)
{
- return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+ return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
}
int
-WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
+ bool forWrite, long timeout)
{
DWORD rc;
HANDLE events[3];
@@ -103,10 +104,17 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
events[0] = latchevent;
events[1] = pgwin32_signal_event;
numevents = 2;
- if (sock != PGINVALID_SOCKET)
+ if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{
+ int flags = 0;
+
+ if (forRead)
+ flags |= FD_READ;
+ if (forWrite)
+ flags |= FD_WRITE;
+
sockevent = WSACreateEvent();
- WSAEventSelect(sock, sockevent, FD_READ);
+ WSAEventSelect(sock, sockevent, flags);
events[numevents++] = sockevent;
}
@@ -139,8 +147,18 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
pgwin32_dispatch_queued_signals();
else if (rc == WAIT_OBJECT_0 + 2)
{
+ WSANETWORKEVENTS resEvents;
+
Assert(sock != PGINVALID_SOCKET);
- result = 2;
+
+ ZeroMemory(&resEvents, sizeof(resEvents));
+ if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
+ ereport(FATAL,
+ (errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
+
+ if ((forRead && resEvents.lNetworkEvents & FD_READ) ||
+ (forWrite && resEvents.lNetworkEvents & FD_WRITE))
+ result = 2;
break;
}
else if (rc != WAIT_OBJECT_0)
@@ -148,7 +166,7 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
}
/* Clean up the handle we created for the socket */
- if (sock != PGINVALID_SOCKET)
+ if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{
WSAEventSelect(sock, sockevent, 0);
WSACloseEvent(sockevent);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f76b5b0..36406d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -74,6 +74,7 @@ bool am_walsender = false; /* Am I a walsender process ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 1000; /* max sleep time between some actions */
+int replication_timeout = 60 * 1000; /* maximum time to send one WAL data message */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -95,6 +96,11 @@ static XLogRecPtr sentPtr = {0, 0};
*/
static StringInfoData reply_message;
+/*
+ * Timestamp of the last receipt of the reply from the standby.
+ */
+static TimestampTz last_reply_timestamp;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t walsender_shutdown_requested = false;
@@ -113,7 +119,7 @@ static int WalSndLoop(void);
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
-static bool XLogSend(char *msgbuf, bool *caughtup);
+static void XLogSend(char *msgbuf, bool *caughtup);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
static void ProcessStandbyMessage(void);
@@ -469,6 +475,7 @@ ProcessRepliesIfAny(void)
{
unsigned char firstchar;
int r;
+ int received = false;
for (;;)
{
@@ -484,7 +491,7 @@ ProcessRepliesIfAny(void)
if (r == 0)
{
/* no data available without blocking */
- return;
+ break;
}
/* Handle the very limited subset of commands expected in this phase */
@@ -495,6 +502,7 @@ ProcessRepliesIfAny(void)
*/
case 'd':
ProcessStandbyMessage();
+ received = true;
break;
/*
@@ -510,6 +518,12 @@ ProcessRepliesIfAny(void)
firstchar)));
}
}
+ /*
+ * Save the last reply timestamp if we've received at least
+ * one reply.
+ */
+ if (received)
+ last_reply_timestamp = GetCurrentTimestamp();
}
/*
@@ -688,6 +702,9 @@ WalSndLoop(void)
*/
initStringInfo(&reply_message);
+ /* Initialize the last reply timestamp */
+ last_reply_timestamp = GetCurrentTimestamp();
+
/* Loop forever, unless we get an error */
for (;;)
{
@@ -706,19 +723,6 @@ WalSndLoop(void)
SyncRepInitConfig();
}
- /*
- * When SIGUSR2 arrives, we send all outstanding logs up to the
- * shutdown checkpoint record (i.e., the latest record) and exit.
- */
- if (walsender_ready_to_stop)
- {
- if (!XLogSend(output_message, &caughtup))
- break;
- ProcessRepliesIfAny();
- if (caughtup)
- walsender_shutdown_requested = true;
- }
-
/* Normal exit from the walsender is here */
if (walsender_shutdown_requested)
{
@@ -730,11 +734,13 @@ WalSndLoop(void)
}
/*
- * If we had sent all accumulated WAL in last round, nap for the
- * configured time before retrying.
+ * If we don't have any pending data in the output buffer, try to
+ * send some more.
*/
- if (caughtup)
+ if (!pq_is_send_pending())
{
+ XLogSend(output_message, &caughtup);
+
/*
* Even if we wrote all the WAL that was available when we started
* sending, more might have arrived while we were sending this
@@ -742,28 +748,79 @@ WalSndLoop(void)
* received any signals from that time. Let's arm the latch
* again, and after that check that we're still up-to-date.
*/
- ResetLatch(&MyWalSnd->latch);
-
- if (!XLogSend(output_message, &caughtup))
- break;
- if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
+ if (caughtup && !pq_is_send_pending())
{
- /*
- * XXX: We don't really need the periodic wakeups anymore,
- * WaitLatchOrSocket should reliably wake up as soon as
- * something interesting happens.
- */
+ ResetLatch(&MyWalSnd->latch);
- /* Sleep */
- WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
- WalSndDelay * 1000L);
+ XLogSend(output_message, &caughtup);
}
}
- else
+
+ /* Flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ break;
+
+ /*
+ * When SIGUSR2 arrives, we send any outstanding logs up to the
+ * shutdown checkpoint record (i.e., the latest record) and exit.
+ */
+ if (walsender_ready_to_stop && !pq_is_send_pending())
{
- /* Attempt to send the log once every loop */
- if (!XLogSend(output_message, &caughtup))
+ XLogSend(output_message, &caughtup);
+ ProcessRepliesIfAny();
+ if (caughtup && !pq_is_send_pending())
+ walsender_shutdown_requested = true;
+ }
+
+ if ((caughtup || pq_is_send_pending()) &&
+ !got_SIGHUP &&
+ !walsender_shutdown_requested)
+ {
+ TimestampTz finish_time;
+ long sleeptime;
+
+ /* Reschedule replication timeout */
+ if (replication_timeout > 0)
+ {
+ long secs;
+ int usecs;
+
+ finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ replication_timeout);
+ TimestampDifference(GetCurrentTimestamp(),
+ finish_time, &secs, &usecs);
+ sleeptime = secs * 1000 + usecs / 1000;
+ if (WalSndDelay < sleeptime)
+ sleeptime = WalSndDelay;
+ }
+ else
+ {
+ /*
+ * XXX: Without timeout, we don't really need the periodic
+ * wakeups anymore, WaitLatchOrSocket should reliably wake up
+ * as soon as something interesting happens.
+ */
+ sleeptime = WalSndDelay;
+ }
+
+ /* Sleep */
+ WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+ true, pq_is_send_pending(),
+ sleeptime * 1000L);
+
+ /* Check for replication timeout */
+ if (replication_timeout > 0 &&
+ GetCurrentTimestamp() >= finish_time)
+ {
+ /*
+ * Since typically expiration of replication timeout means
+ * communication problem, we don't send the error message
+ * to the standby.
+ */
+ ereport(COMMERROR,
+ (errmsg("terminating walsender process due to replication timeout")));
break;
+ }
}
/*
@@ -993,7 +1050,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
/*
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
- * but not yet sent to the client, and send it.
+ * but not yet sent to the client, and buffer it in the libpq output
+ * buffer.
*
* msgbuf is a work area in which the output message is constructed. It's
* passed in just so we can avoid re-palloc'ing the buffer on each cycle.
@@ -1001,10 +1059,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
*
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
- *
- * Returns true if OK, false if trouble.
+
*/
-static bool
+static void
XLogSend(char *msgbuf, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
@@ -1027,7 +1084,7 @@ XLogSend(char *msgbuf, bool *caughtup)
if (XLByteLE(SendRqstPtr, sentPtr))
{
*caughtup = true;
- return true;
+ return;
}
/*
@@ -1099,11 +1156,7 @@ XLogSend(char *msgbuf, bool *caughtup)
memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
- pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
-
- /* Flush pending output to the client */
- if (pq_flush())
- return false;
+ pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
sentPtr = endptr;
@@ -1127,7 +1180,7 @@ XLogSend(char *msgbuf, bool *caughtup)
set_ps_display(activitymsg, false);
}
- return true;
+ return;
}
/* SIGHUP: set flag to re-read config file at next convenient time */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9ca1329..b49bdae 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1856,6 +1856,16 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+ gettext_noop("Sets the maximum time to wait for WAL replication."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &replication_timeout,
+ 60 * 1000, 0, INT_MAX, NULL, NULL
+ },
+
+ {
{"commit_delay", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the delay in microseconds between transaction commit and "
"flushing WAL to disk."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ed70223..4348185 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -200,6 +200,7 @@
#wal_sender_delay = 1s # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
+#replication_timeout = 60s # in milliseconds, 0 is disabled
# - Standby Servers -
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 8ecab6d..b20b0c2 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -60,7 +60,10 @@ extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
extern int pq_flush(void);
+extern int pq_flush_if_writable(void);
+extern bool pq_is_send_pending(void);
extern int pq_putmessage(char msgtype, const char *s, size_t len);
+extern int pq_putmessage_noblock(char msgtype, const char *s, size_t len);
extern void pq_startcopyout(void);
extern void pq_endcopyout(bool errorAbort);
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 150a71f..2670a2e 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -98,6 +98,7 @@ extern volatile sig_atomic_t walsender_ready_to_stop;
/* user-settable parameters */
extern int WalSndDelay;
extern int max_wal_senders;
+extern int replication_timeout;
extern int WalSenderMain(void);
extern void WalSndSignals(void);
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 31744ff..f64e13b 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -40,7 +40,7 @@ extern void OwnLatch(volatile Latch *latch);
extern void DisownLatch(volatile Latch *latch);
extern bool WaitLatch(volatile Latch *latch, long timeout);
extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
- long timeout);
+ bool forRead, bool forWrite, long timeout);
extern void SetLatch(volatile Latch *latch);
extern void ResetLatch(volatile Latch *latch);
#define TestLatch(latch) (((volatile Latch *) latch)->is_set)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers