Having gone through the patch now in more detail, I think it's in pretty
good shape. I'm happy with the overall design, except that I haven't
been able to make up my mind if walreceiver should indeed be a
stand-alone program as discussed, or a postmaster child process as in
the patch you submitted. Putting that question aside for a moment,
here's some minor things, in no particular order:
- The async API in PQgetXLogData is quite different from the other
commands. It's close to the API from PQgetCopyData(), but doesn't return
a malloc'd buffer like PQgetCopyData does. I presume that's to optimize
away the extra memcpy step? I don't think that's really necessary, I
don't recall any complaints about that in PQgetCopyData(), and if it
does become an issue, it could be optimized away by mallocing the buffer
first and reading directly to that.
- Can we avoid sprinkling XLogStreamingAllowed() calls to places where
we check if WAL-logging is required (nbtsort.c, copy.c etc.). I think we
need a new macro to encapsulate (XLogArchivingActive() ||
XLogStreamingAllowed()).
- Is O_DIRECT ever a good idea in walreceiver? If it's really direct and
doesn't get cached, the startup process will need to read from disk.
- Can we replace read/write_conninfo with just a long-enough field in
shared mem? Would be simpler. (this is moot if we go with the
stand-alone walreceiver program and pass it as a command-line argument)
- walreceiver shouldn't die on connection error, just to be restarted by
startup process. Can we add error handling a la bgwriter and have a
retry loop within walreceiver? (again, if we go with a stand-alone
walreceiver program, it's probably better to have startup process
responsible to restart walreceiver, as it is now)
- pq_wait in backend waits until you can read or write at least 1 byte.
There is no guarantee that you can send or read the whole message
without blocking. We'd have to put the socket in non-blocking mode for
that. I'm not sure what the implications of this are.
- we should include system_identifier somewhere in the replication
startup handshake. Otherwise you can connect to server from a different
system and have logs shipped, if they happen to be roughly at the same
point in WAL. Replay will almost certainly fail, but we should error
earlier.
- I know I said we should have just asynchronous replication at first,
but looking ahead, how would you do synchronous? What kind of signaling
is needed between walreceiver and startup process for that?
- 'replication' shouldn't be a real database.
I found the paging logic in walsender confusing, and didn't like the
idea that walsender needs to set the XLOGSTREAM_END_SEG flag. Surely
walreceiver knows how to split the WAL into files without such a flag. I
reworked that logic, I think it's easier to understand now. I kept the
support for the flag in libpq and the protocol for now, but it should be
removed too, or repurposed to indicate that pg_switch_xlog() was done in
the master. I've pushed that to 'replication-orig' branch in my git
repository, attached is the same as a diff against your SR_0914.patch.
I need a break from this patch, so I'll take a closer look at Simon's
hot standby now. Meanwhile, can you work on the above items and submit a
new version, please?
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
*** a/src/backend/access/transam/recovery.conf.sample
--- b/src/backend/access/transam/recovery.conf.sample
***************
*** 2,10 ****
# PostgreSQL recovery config file
# -------------------------------
#
! # Edit this file to provide the parameters that PostgreSQL
! # needs to perform an archive recovery of a database, or
! # a log-streaming replication.
#
# If "recovery.conf" is present in the PostgreSQL data directory, it is
# read on postmaster startup. After successful recovery, it is renamed
--- 2,10 ----
# PostgreSQL recovery config file
# -------------------------------
#
! # Edit this file to provide the parameters that PostgreSQL needs to
! # perform an archive recovery of a database, or to act as a log-streaming
! # replication standby.
#
# If "recovery.conf" is present in the PostgreSQL data directory, it is
# read on postmaster startup. After successful recovery, it is renamed
***************
*** 83,89 ****
#---------------------------------------------------------------------------
#
# When standby_mode is enabled, the PostgreSQL server will work as
! # the standby. It tries to connect to the primary according to the
# connection settings primary_conninfo, and receives XLOG records
# continuously.
#
--- 83,89 ----
#---------------------------------------------------------------------------
#
# When standby_mode is enabled, the PostgreSQL server will work as
! # a standby. It tries to connect to the primary according to the
# connection settings primary_conninfo, and receives XLOG records
# continuously.
#
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 2645,2653 **** XLogFileClose(void)
* WAL segment files will not be re-read in normal operation, so we advise
* the OS to release any cached pages. But do not do so if WAL archiving
* or streaming is active, because archiver and walsender process could use
! * the cache to read the WAL segment, respectively. Also, don't bother
! * with it if we are using O_DIRECT, since the kernel is presumably not
! * caching in that case.
*/
#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
if (!XLogArchivingActive() && !WalSndInProgress() &&
--- 2645,2653 ----
* WAL segment files will not be re-read in normal operation, so we advise
* the OS to release any cached pages. But do not do so if WAL archiving
* or streaming is active, because archiver and walsender process could use
! * the cache to read the WAL segment. Also, don't bother with it if we
! * are using O_DIRECT, since the kernel is presumably not caching in that
! * case.
*/
#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
if (!XLogArchivingActive() && !WalSndInProgress() &&
***************
*** 3481,3487 **** FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
startlsn.xlogid, startlsn.xrecoff)));
}
! return ReadRecord(RecPtr, emode);
}
/*
--- 3481,3487 ----
startlsn.xlogid, startlsn.xrecoff)));
}
! return ReadRecord(RecPtr, emode);
}
/*
***************
*** 5284,5290 **** exitStreamingRecovery(void)
*/
ShutdownWalRcv();
! /* We are no longer in streaming recovery state */
InStreamingRecovery = false;
ereport(LOG,
--- 5284,5290 ----
*/
ShutdownWalRcv();
! /* We are no longer in streaming recovery state */
InStreamingRecovery = false;
ereport(LOG,
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 289,295 **** typedef enum
PM_WAIT_BACKENDS, /* waiting for live backends to exit */
PM_SHUTDOWN, /* waiting for bgwriter to do shutdown ckpt */
PM_SHUTDOWN_2, /* waiting for archiver to finish */
- PM_SHUTDOWN_3, /* waiting for walsenders to finish */
PM_WAIT_DEAD_END, /* waiting for dead_end children to exit */
PM_NO_CHILDREN /* all important children have exited */
} PMState;
--- 289,294 ----
***************
*** 1640,1646 **** retry1:
if (proto == XLOG_STREAMING_CODE && !am_walsender)
{
am_walsender = true;
! /* No packets other than regular one should not follow */
return ProcessStartupPacket(port, SSLdone);
}
--- 1639,1645 ----
if (proto == XLOG_STREAMING_CODE && !am_walsender)
{
am_walsender = true;
! /* No packets other than regular one should follow */
return ProcessStartupPacket(port, SSLdone);
}
***************
*** 2404,2420 **** reaper(SIGNAL_ARGS)
*/
Assert(Shutdown > NoShutdown);
! if (PgArchPID != 0)
{
/* Waken archiver for the last time */
! signal_child(PgArchPID, SIGUSR2);
! pmState = PM_SHUTDOWN_2;
! }
! else if (WalSndInProgress())
! {
/* Waken walsenders for the last time */
SignalWalSenders(SIGUSR2);
! pmState = PM_SHUTDOWN_3;
}
else
pmState = PM_WAIT_DEAD_END;
--- 2403,2418 ----
*/
Assert(Shutdown > NoShutdown);
! if (PgArchPID != 0 || WalSndInProgress())
{
/* Waken archiver for the last time */
! if (PgArchPID != 0)
! signal_child(PgArchPID, SIGUSR2);
!
/* Waken walsenders for the last time */
SignalWalSenders(SIGUSR2);
!
! pmState = PM_SHUTDOWN_2;
}
else
pmState = PM_WAIT_DEAD_END;
***************
*** 2499,2510 **** reaper(SIGNAL_ARGS)
((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
WalRcvInProgress())))
PgArchPID = pgarch_start();
! else if (pmState == PM_SHUTDOWN_2 && WalSndInProgress())
! {
! SignalWalSenders(SIGUSR2);
! pmState = PM_SHUTDOWN_3;
! }
! else
pmState = PM_WAIT_DEAD_END;
continue;
}
--- 2497,2503 ----
((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
WalRcvInProgress())))
PgArchPID = pgarch_start();
! else if (pmState == PM_SHUTDOWN_2 && !WalSndInProgress())
pmState = PM_WAIT_DEAD_END;
continue;
}
***************
*** 2611,2618 **** CleanupBackend(int pid,
* advance to the next shutdown step.
*/
if (bp->child_type == BACKEND_TYPE_WALSND &&
! pmState == PM_SHUTDOWN_3 &&
! !WalSndInProgress())
pmState = PM_WAIT_DEAD_END;
}
DLRemove(curr);
--- 2604,2611 ----
* advance to the next shutdown step.
*/
if (bp->child_type == BACKEND_TYPE_WALSND &&
! pmState == PM_SHUTDOWN_2 &&
! !WalSndInProgress() && PgArchPID == 0)
pmState = PM_WAIT_DEAD_END;
}
DLRemove(curr);
*** a/src/backend/postmaster/walreceiver.c
--- b/src/backend/postmaster/walreceiver.c
***************
*** 100,108 **** static void WalRcvQuickDieHandler(SIGNAL_ARGS);
static void WalRcvLoop(void);
static void InitWalRcv(void);
static void WalRcvKill(int code, Datum arg);
! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced);
! static void XLogWalRcvFlush(XLogRecPtr recptr);
! static void WritePhysicalXLog(char *from, Size nbytes, int startoff);
static char *read_conninfo_file(void);
/* Main entry point for walreceiver process */
--- 100,107 ----
static void WalRcvLoop(void);
static void InitWalRcv(void);
static void WalRcvKill(int code, Datum arg);
! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr);
! static void XLogWalRcvFlush(void);
static char *read_conninfo_file(void);
/* Main entry point for walreceiver process */
***************
*** 228,235 **** WalRcvLoop(void)
/* Loop until end-of-streaming or error */
for (;;)
{
- bool fsynced = false;
-
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
--- 227,232 ----
***************
*** 298,304 **** WalRcvLoop(void)
* can recover all transactions from the primary).
*/
! XLogWalRcvWrite(buf, len, recptr, &fsynced);
/*
* The logs in the XLogData message were written successfully,
--- 295,301 ----
* can recover all transactions from the primary).
*/
! XLogWalRcvWrite(buf, len, recptr);
/*
* The logs in the XLogData message were written successfully,
***************
*** 307,357 **** WalRcvLoop(void)
PQmarkConsumed(streamConn);
/*
! * If fsync is not requested or was already done, we send a "success"
! * to the primary before issuing fsync for end-of-segment.
*/
! if (fsynced || !fsync_requested)
! {
! if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
! (int) fsynced) == -1)
! ereport(FATAL,
! (errmsg("could not send a message to the primary: %s",
! PQerrorMessage(streamConn))));
! }
!
! /*
! * If we just wrote the whole last page of a logfile segment but
! * had not fsynced it yet, fsync the segment immediately. This
! * avoids having to go back and re-open prior segments when an
! * fsync request comes along later.
! *
! * Of course, if asked to fsync but not, do so.
! */
! if (!fsynced && (fsync_requested || finishing_seg))
! {
! XLogWalRcvFlush(recptr);
!
! if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
! 1) == -1)
! ereport(FATAL,
! (errmsg("could not send a message to the primary: %s",
! PQerrorMessage(streamConn))));
!
! /*
! * If the segment is ready to copy to archival storage,
! * notify the archiver so.
! */
! if (finishing_seg && XLogArchivingActive())
! XLogArchiveNotifySeg(recvId, recvSeg);
!
! /*
! * XXX: Should we signal bgwriter to start a restartpoint
! * if we've consumed too much xlog since the last one, like
! * in normal processing? But this is not worth doing unless
! * a restartpoint can be created independently from a
! * checkpoint record.
! */
! }
}
if (len == -1) /* end-of-streaming */
--- 304,314 ----
PQmarkConsumed(streamConn);
/*
! * If the primary requested us to fsync, do so now and send
! * and acknowledgement.
*/
! if (fsync_requested)
! XLogWalRcvFlush();
}
if (len == -1) /* end-of-streaming */
***************
*** 511,589 **** WalRcvInProgress(void)
* fsynced is set to true if the log was fsyned by O_DIRECT.
*/
static void
! XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
{
int startoff;
! int endoff;
! START_CRIT_SECTION();
! if (!XLByteInPrevSeg(recptr, recvId, recvSeg))
{
! bool use_existent;
! /*
! * XLOG segment files will be re-read in recovery operation soon,
! * so we don't need to advise the OS to release any cache page.
! */
! if (recvFile >= 0 && close(recvFile))
ereport(PANIC,
(errcode_for_file_access(),
! errmsg("could not close log file %u, segment %u: %m",
! recvId, recvSeg)));
! recvFile = -1;
!
! /* Create/use new log file */
! XLByteToPrevSeg(recptr, recvId, recvSeg);
! use_existent = true;
! recvFile = XLogFileInit(recvId, recvSeg,
! &use_existent, true);
! recvOff = 0;
! }
! /* Make sure we have the current logfile open */
! if (recvFile < 0)
! {
! XLByteToPrevSeg(recptr, recvId, recvSeg);
! recvFile = XLogFileOpen(recvId, recvSeg);
! recvOff = 0;
! }
! /* Calculate the start/end file offset of the received logs */
! endoff = recptr.xrecoff % XLogSegSize;
! startoff = ((endoff == 0) ? XLogSegSize : endoff) - len;
/*
! * Re-zero the page so that bytes beyond what we've written will look
! * like zeroes and not valid XLOG records. Only end page which we are
! * writing need to be zeroed. Of course, we can skip zeroing the pages
! * full of the XLOG records. Save the end position of the already zeroed
! * area at the variable ZeroedRecPtr, and avoid zeroing the same page
! * two or more times.
*
* This must precede the writing of the actual logs. Otherwise, a crash
! * before re-zeroing would cause a corrupted page.
*/
! if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0)
{
int zlen;
! zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ;
! WritePhysicalXLog(ZeroedBuffer, zlen, endoff);
ZeroedRecPtr = recptr;
ZeroedRecPtr.xrecoff += zlen;
- }
! /* Write out the logs */
! WritePhysicalXLog(buf, len, startoff);
! LogstreamResult.Send = recptr;
! LogstreamResult.Write = recptr;
!
! if (sync_method == SYNC_METHOD_OPEN ||
! sync_method == SYNC_METHOD_OPEN_DSYNC)
! {
! LogstreamResult.Flush = recptr;
! *fsynced = true; /* logs were already fsynced */
}
/* Update shared-memory status */
--- 468,623 ----
* fsynced is set to true if the log was fsyned by O_DIRECT.
*/
static void
! XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int startoff;
! int byteswritten;
! START_CRIT_SECTION(); /* XXX: Why? */
! while (nbytes > 0)
{
! int segbytes;
! uint32 tmp;
! if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
! {
! bool use_existent;
!
! /*
! * XLOG segment files will be re-read in recovery operation soon,
! * so we don't need to advise the OS to release any cache page.
! */
! if (recvFile >= 0)
! {
! /*
! * fsync() before we switch to next file. We would otherwise
! * have to reopen this file to fsync it later
! */
! XLogWalRcvFlush();
! if (close(recvFile) != 0)
! ereport(PANIC,
! (errcode_for_file_access(),
! errmsg("could not close log file %u, segment %u: %m",
! recvId, recvSeg)));
! }
! recvFile = -1;
!
! /* Create/use new log file */
! XLByteToSeg(recptr, recvId, recvSeg);
! use_existent = true;
! recvFile = XLogFileInit(recvId, recvSeg,
! &use_existent, true);
! recvOff = 0;
! }
!
! /* Calculate the start offset of the received logs */
! startoff = recptr.xrecoff % XLogSegSize;
!
! if (startoff + nbytes > XLOG_SEG_SIZE)
! segbytes = XLOG_SEG_SIZE - startoff;
! else
! segbytes = nbytes;
!
! /* Need to seek in the file? */
! if (recvOff != startoff)
! {
! if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
! ereport(PANIC,
! (errcode_for_file_access(),
! errmsg("could not seek in log file %u, "
! "segment %u to offset %u: %m",
! recvId, recvSeg, startoff)));
! recvOff = startoff;
! }
!
! /* OK to write the logs */
! errno = 0;
!
! byteswritten = write(recvFile, buf, segbytes);
! if (byteswritten <= 0)
! {
! /* if write didn't set errno, assume no disk space */
! if (errno == 0)
! errno = ENOSPC;
ereport(PANIC,
(errcode_for_file_access(),
! errmsg("could not write to log file %u, segment %u "
! "at offset %u, length %lu: %m",
! recvId, recvSeg,
! recvOff, (unsigned long) segbytes)));
! }
! /* Update state for read */
! tmp = recptr.xrecoff + byteswritten;
! if (tmp < recptr.xrecoff)
! recptr.xlogid++; /* overflow */
! recptr.xrecoff = tmp;
! recvOff += byteswritten;
! nbytes -= byteswritten;
! buf += byteswritten;
!
! LogstreamResult.Send = recptr;
! LogstreamResult.Write = recptr;
!
! if (sync_method == SYNC_METHOD_OPEN ||
! sync_method == SYNC_METHOD_OPEN_DSYNC)
! {
! LogstreamResult.Flush = recptr;
! }
!
! /*
! * If the segment is ready to copy to archival storage,
! * notify the archiver so.
! */
! if ((recptr.xrecoff % XLOG_SEG_SIZE == 0) && XLogArchivingActive())
! XLogArchiveNotifySeg(recvId, recvSeg);
!
! /*
! * XXX: Should we signal bgwriter to start a restartpoint
! * if we've consumed too much xlog since the last one, like
! * in normal processing? But this is not worth doing unless
! * a restartpoint can be created independently from a
! * checkpoint record.
! */
! }
/*
! * Zero the rest of the last page we wrote to, so that bytes beyond what
! * we've written will look like zeroes and not valid XLOG records. Save
! * the end position of the already zeroed area at the variable
! * ZeroedRecPtr, and avoid zeroing the same page two or more times.
*
* This must precede the writing of the actual logs. Otherwise, a crash
! * before re-zeroing would cause a corrupted page. XXX: that's not really
! * an issue, a hard crash could leave the page half-flushed anyway. And we
! * have CRC to protect from that anyway, this zeroing business isn't
! * absolutely necessary anyway.
*/
! if (XLByteLT(ZeroedRecPtr, recptr) && recptr.xrecoff % XLOG_BLCKSZ != 0)
{
int zlen;
! zlen = XLOG_BLCKSZ - recptr.xrecoff % XLOG_BLCKSZ;
!
! byteswritten = write(recvFile, ZeroedBuffer, zlen);
! if (byteswritten != zlen)
! {
! /* if write didn't set errno, assume no disk space */
! if (errno == 0)
! errno = ENOSPC;
! ereport(PANIC,
! (errcode_for_file_access(),
! errmsg("could not write to log file %u, segment %u "
! "at offset %u, length %lu: %m",
! recvId, recvSeg,
! recvOff, (unsigned long) nbytes)));
! }
ZeroedRecPtr = recptr;
ZeroedRecPtr.xrecoff += zlen;
! recvOff += byteswritten;
}
/* Update shared-memory status */
***************
*** 594,600 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
SpinLockAcquire(&walrcv->mutex);
XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
! if (*fsynced)
XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
SpinLockRelease(&walrcv->mutex);
}
--- 628,635 ----
SpinLockAcquire(&walrcv->mutex);
XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
! if (sync_method == SYNC_METHOD_OPEN ||
! sync_method == SYNC_METHOD_OPEN_DSYNC)
XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
SpinLockRelease(&walrcv->mutex);
}
***************
*** 607,666 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
/* Flush the log to disk */
static void
! XLogWalRcvFlush(XLogRecPtr recptr)
{
! START_CRIT_SECTION();
!
! issue_xlog_fsync(recvFile, recvId, recvSeg);
!
! LogstreamResult.Flush = recptr;
!
! /* Update shared-memory status */
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
SpinLockAcquire(&walrcv->mutex);
XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
SpinLockRelease(&walrcv->mutex);
- }
-
- END_CRIT_SECTION();
- }
! /* Physical write to the given logs */
! static void
! WritePhysicalXLog(char *from, Size nbytes, int startoff)
! {
! /* Need to seek in the file? */
! if (recvOff != startoff)
! {
! if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
! ereport(PANIC,
! (errcode_for_file_access(),
! errmsg("could not seek in log file %u, "
! "segment %u to offset %u: %m",
! recvId, recvSeg, startoff)));
! recvOff = startoff;
! }
! /* OK to write the logs */
! errno = 0;
! if (write(recvFile, from, nbytes) != nbytes)
! {
! /* if write didn't set errno, assume no disk space */
! if (errno == 0)
! errno = ENOSPC;
! ereport(PANIC,
! (errcode_for_file_access(),
! errmsg("could not write to log file %u, segment %u "
! "at offset %u, length %lu: %m",
! recvId, recvSeg,
! recvOff, (unsigned long) nbytes)));
}
-
- /* Update state for write */
- recvOff += nbytes;
}
/*
--- 642,674 ----
/* Flush the log to disk */
static void
! XLogWalRcvFlush(void)
{
! if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
+ START_CRIT_SECTION();
+
+ issue_xlog_fsync(recvFile, recvId, recvSeg);
+
+ LogstreamResult.Flush = LogstreamResult.Write;
+
+ /* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
SpinLockRelease(&walrcv->mutex);
! END_CRIT_SECTION();
! /* Let the primary know */
! if (PQputXLogRecPtr(streamConn, LogstreamResult.Flush.xlogid,
! LogstreamResult.Flush.xrecoff, 1) == -1)
! ereport(FATAL,
! (errmsg("could not send a message to the primary: %s",
! PQerrorMessage(streamConn))));
}
}
/*
*** a/src/backend/postmaster/walsender.c
--- b/src/backend/postmaster/walsender.c
***************
*** 113,122 **** static void WalSndQuickDieHandler(SIGNAL_ARGS);
static int WalSndLoop(void);
static void InitWalSnd(void);
static void WalSndKill(int code, Datum arg);
! static void XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr);
static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg);
static bool ProcessStreamMsgs(PendingMessage inMsg);
/* Main entry point for walsender process */
int
WalSenderMain(void)
--- 113,127 ----
static int WalSndLoop(void);
static void InitWalSnd(void);
static void WalSndKill(int code, Datum arg);
! static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg);
static bool ProcessStreamMsgs(PendingMessage inMsg);
+ /*
+ * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
+ */
+ #define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
+
/* Main entry point for walsender process */
int
WalSenderMain(void)
***************
*** 382,400 **** WalSndKill(int code, Datum arg)
}
/*
! * Read the log into buffer.
! *
! * startoff is the file offset where we start reading the log from; nbytes is
! * the number of bytes which needs to be read; recptr is the last byte + 1 to
! * read.
*/
void
! XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
{
char path[MAXPGPATH];
!
! /* Don't cross a segment boundary */
! Assert(startoff + nbytes <= XLogSegSize);
#ifdef REPLICATION_DEBUG
if (REPLICATION_DEBUG_ENABLED)
--- 387,399 ----
}
/*
! * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
*/
void
! XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
{
char path[MAXPGPATH];
! uint32 startoff;
#ifdef REPLICATION_DEBUG
if (REPLICATION_DEBUG_ENABLED)
***************
*** 404,464 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
#endif
! if (!XLByteInPrevSeg(recptr, sendId, sendSeg))
{
! /* Switch to another logfile segment */
! if (sendFile >= 0)
! close(sendFile);
! XLByteToPrevSeg(recptr, sendId, sendSeg);
! XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
! sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
! if (sendFile < 0)
! ereport(FATAL,
! (errcode_for_file_access(),
! errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
! path, sendId, sendSeg)));
! sendOff = 0;
! }
! /* Make sure we have the current logfile open */
! if (sendFile < 0)
! {
! XLByteToPrevSeg(recptr, sendId, sendSeg);
! XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
! sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
! if (sendFile < 0)
! ereport(FATAL,
! (errcode_for_file_access(),
! errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
! path, sendId, sendSeg)));
! sendOff = 0;
! }
! /* Need to seek in the file? */
! if (sendOff != startoff)
! {
! if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
ereport(FATAL,
(errcode_for_file_access(),
! errmsg("could not seek in log file %u, segment %u to offset %u: %m",
! sendId, sendSeg, startoff)));
! sendOff = startoff;
! }
!
! if (read(sendFile, buf, nbytes) != nbytes)
! {
! ereport(FATAL,
! (errcode_for_file_access(),
! errmsg("could not read from log file %u, segment %u, offset %u, "
! "length %lu: %m",
! sendId, sendSeg, sendOff, (unsigned long) nbytes)));
}
-
- /* Update state for read */
- sendOff += nbytes;
}
/*
--- 403,469 ----
LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
#endif
! while (nbytes > 0)
{
! int segbytes;
! int readbytes;
! uint32 tmp;
! startoff = recptr.xrecoff % XLOG_SEG_SIZE;
! if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
! {
! /* Switch to another logfile segment */
! if (sendFile >= 0)
! close(sendFile);
!
! XLByteToSeg(recptr, sendId, sendSeg);
! XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
!
! sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
! if (sendFile < 0)
! ereport(FATAL, /* XXX: Why FATAL? */
! (errcode_for_file_access(),
! errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
! path, sendId, sendSeg)));
! sendOff = 0;
! }
! /* Need to seek in the file? */
! if (sendOff != startoff)
! {
! if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
! ereport(FATAL,
! (errcode_for_file_access(),
! errmsg("could not seek in log file %u, segment %u to offset %u: %m",
! sendId, sendSeg, startoff)));
! sendOff = startoff;
! }
! /* How many bytes are within this segment? */
! if (nbytes > (XLOG_SEG_SIZE - startoff))
! segbytes = XLOG_SEG_SIZE - startoff;
! else
! segbytes = nbytes;
! readbytes = read(sendFile, buf, segbytes);
! if (readbytes <= 0)
ereport(FATAL,
(errcode_for_file_access(),
! errmsg("could not read from log file %u, segment %u, offset %u, "
! "length %lu: %m",
! sendId, sendSeg, sendOff, (unsigned long) segbytes)));
!
! /* Update state for read */
! tmp = recptr.xrecoff + readbytes;
! if (tmp < recptr.xrecoff)
! recptr.xlogid++; /* overflow */
! recptr.xrecoff = tmp;
!
! sendOff += readbytes;
! nbytes -= readbytes;
! buf += readbytes;
}
}
/*
***************
*** 469,488 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
static bool
XLogSend(PendingMessage inMsg, PendingMessage outMsg)
{
- bool ispartialpage;
- bool last_iteration;
- bool finishing_seg;
- int nmsgs;
- int npages;
int res;
- uint32 startpos;
- uint32 startoff;
- uint32 endpos;
XLogRecPtr SendRqstPtr;
/*
! * Invalid position means that XLOG streaming is not started yet,
! * so we do nothing here.
*/
if (XLogRecPtrIsInvalid(LogstreamResult.Send))
return true;
--- 474,486 ----
static bool
XLogSend(PendingMessage inMsg, PendingMessage outMsg)
{
int res;
XLogRecPtr SendRqstPtr;
/*
! * Invalid position means that we have not yet received the initial
! * XLogRecPtr message from the slave that indicates where to start the
! * streaming.
*/
if (XLogRecPtrIsInvalid(LogstreamResult.Send))
return true;
***************
*** 490,495 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg)
--- 488,497 ----
/* Attempt to send all the records which were written to the disk */
SendRqstPtr = GetWriteRecPtr();
+ /* Quick exit if nothing to do */
+ if (!XLByteLT(LogstreamResult.Send, SendRqstPtr))
+ return true;
+
#ifdef REPLICATION_DEBUG
if (REPLICATION_DEBUG_ENABLED)
elog(LOG, "xlog send request %X/%X; send %X/%X; write %X/%X",
***************
*** 520,631 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg)
* sending in the last page. We must initialize all of them to
* keep the compiler quiet.
*/
- nmsgs = 0;
- npages = 0;
- startpos = 0;
- startoff = 0;
- endpos = XLOG_BLCKSZ;
while (XLByteLT(LogstreamResult.Send, SendRqstPtr))
{
/*
! * Advance LogstreamResult.Send to end of current page. If this
! * is a first loop iteration (i.e., in the case where npages is 0),
! * it might indicate a halfway position or cross a logid boundary,
! * so alignment is needed. Otherwise, since it's guaranteed that
! * LogstreamResult.Send indicates end of previous page and we have
! * not crossed a logid boundary yet in this loop iteration,
! * we have only to increment it by XLOG_BLCKSZ bytes.
*/
! if (npages == 0)
! {
! startpos = LogstreamResult.Send.xrecoff % XLOG_BLCKSZ;
! startoff = LogstreamResult.Send.xrecoff % XLogSegSize - startpos;
! LogstreamResult.Send.xrecoff += XLOG_BLCKSZ - startpos;
! if (LogstreamResult.Send.xrecoff > XLogFileSize)
! {
! LogstreamResult.Send.xlogid++;
! LogstreamResult.Send.xrecoff %= XLogFileSize;
! }
! }
! else
! LogstreamResult.Send.xrecoff += XLOG_BLCKSZ;
! ispartialpage = XLByteLT(SendRqstPtr, LogstreamResult.Send);
! npages++;
/*
! * Read and send the set if this will be the last loop iteration,
! * or if the number of pages in the set is larger than
! * MaxPagesPerXLogData, or if we are at the end of the logfile
! * segment.
*/
- last_iteration = !XLByteLT(LogstreamResult.Send, SendRqstPtr);
- if (last_iteration)
- {
- endpos = SendRqstPtr.xrecoff % XLOG_BLCKSZ;
- if (endpos == 0)
- endpos = XLOG_BLCKSZ;
- }
-
- finishing_seg = !ispartialpage &&
- (startoff + npages * XLOG_BLCKSZ) >= XLogSegSize;
! /* Only asked to send a partial page */
! if (ispartialpage)
! LogstreamResult.Send = SendRqstPtr;
! if (last_iteration ||
! npages >= MaxPagesPerXLogData ||
! finishing_seg)
{
! Size nbytes;
! uint8 flags = 0;
!
! if (finishing_seg)
! flags |= XLOGSTREAM_END_SEG;
!
! /*
! * XXX: Should we request the standby to fsync the log if the
! * current set might include a shutdown checkpoint record?
! */
!
! /* OK to read and send the log */
! pq_beginasyncmsg(outMsg, 'w');
! pq_sendint(outMsg->buf, flags, 1);
! pq_sendint(outMsg->buf, LogstreamResult.Send.xlogid, 4);
! pq_sendint(outMsg->buf, LogstreamResult.Send.xrecoff, 4);
!
! nbytes = (npages - 1) * (Size) XLOG_BLCKSZ - startpos + endpos;
!
! /*
! * Read the log into the output buffer directly to prevent
! * extra memcpy calls.
! */
! XLogRead(BufferGetStringInfo(outMsg->buf, nbytes),
! startoff + startpos, nbytes, LogstreamResult.Send);
! res = pq_endasyncmsg(outMsg);
! if (res < 0)
! return false;
! if (res == 0)
! break;
! /*
! * Stop sending the log for another job (e.g., checking for
! * interrupts) periodically.
! */
! if (++nmsgs > MaxMsgsPerXLogSend)
! {
! pending_xlog_send = true;
! break;
! }
!
! npages = 0;
! }
! if (ispartialpage)
break;
}
--- 522,588 ----
* sending in the last page. We must initialize all of them to
* keep the compiler quiet.
*/
while (XLByteLT(LogstreamResult.Send, SendRqstPtr))
{
+ XLogRecPtr startptr;
+ XLogRecPtr endptr;
+ Size nbytes;
+ uint8 flags = 0;
+
/*
! * Figure out how much to send in one message. If there's less than
! * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
! * MAX_SEND_SIZE bytes, but round to page boundary for efficiency.
*/
! startptr = LogstreamResult.Send;
! endptr = startptr;
! endptr.xrecoff += MAX_SEND_SIZE;
! if(endptr.xrecoff < startptr.xrecoff)
! endptr.xlogid++; /* xrecoff overflowed */
! /* round down to page boundary */
! endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
! if (XLByteLT(SendRqstPtr, endptr))
! endptr = SendRqstPtr;
/*
! * XXX: Should we request the standby to fsync the log if the
! * current set might include a shutdown checkpoint record?
! *
! * Heikki: Well, we don't do that with other checkpoints, I don't
! * see why we should at a shutdown checkpoint. However, perhaps
! * walreceiver should do an fsync whenever the connection is lost,
! * whatever the reason (e.g the master has been shut down) ?
*/
! /* OK to read and send the log */
! pq_beginasyncmsg(outMsg, 'w');
! pq_sendint(outMsg->buf, flags, 1);
! pq_sendint(outMsg->buf, startptr.xlogid, 4);
! pq_sendint(outMsg->buf, startptr.xrecoff, 4);
! if (endptr.xlogid != startptr.xlogid)
{
! Assert(endptr.xlogid == startptr.xlogid + 1);
! nbytes = (0xffffffff - endptr.xrecoff) + startptr.xrecoff;
! }
! else
! nbytes = endptr.xrecoff - startptr.xrecoff;
! LogstreamResult.Send = endptr;
! /*
! * Read the log into the output buffer directly to prevent
! * extra memcpy calls.
! */
! XLogRead(BufferGetStringInfo(outMsg->buf, nbytes), startptr, nbytes);
! res = pq_endasyncmsg(outMsg);
! if (res < 0)
! return false;
! if (res == 0)
break;
}
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers