On Mon, Jul 4, 2011 at 6:24 PM, Simon Riggs <[email protected]> wrote:
> On Tue, Jun 14, 2011 at 6:08 AM, Fujii Masao <[email protected]> wrote:
>
>>> The standby must not accept replication connection from that standby itself.
>>> Otherwise, since any new WAL data would not appear in that standby,
>>> replication cannot advance any more. As a safeguard against this, I
>>> introduced
>>> new ID to identify each instance. The walsender sends that ID as the fourth
>>> field of the reply of IDENTIFY_SYSTEM, and then walreceiver checks whether
>>> the IDs are the same between two servers. If they are the same, which means
>>> that the standby is just connecting to that standby itself, so walreceiver
>>> emits ERROR.
>
> Thanks for waiting for review.
Thanks for the review!
> This part of the patch is troubling me. I think you have identified an
> important problem, but this solution doesn't work fully.
>
> If we allow standbys to connect to other standbys then we have
> problems with standbys not being connected to master. This can occur
> with a 1-step connection, as you point out, but it could also occur
> with a 2-step, 3-step or more connection, where a circle of standbys
> are all depending upon each other. Your solution only works for 1-step
> connections. "Solving" that problem in a general sense might be more
> dangerous than leaving it alone. I think we should think some more
> about the issues there and approach them as a separate problem.
>
> I think we should remove that and just focus on the main problem, for
> now. That will make it a simpler patch and easier to commit.
I agree to focus on the main problem first. I removed that. Attached
is the updated version.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1998,2004 **** SET ENABLE_SEQSCAN TO OFF;
doesn't keep any extra segments for standby purposes, and the number
of old WAL segments available to standby servers is a function of
the location of the previous checkpoint and status of WAL
! archiving. This parameter has no effect on restartpoints.
This parameter can only be set in the
<filename>postgresql.conf</> file or on the server command line.
</para>
--- 1998,2004 ----
doesn't keep any extra segments for standby purposes, and the number
of old WAL segments available to standby servers is a function of
the location of the previous checkpoint and status of WAL
! archiving.
This parameter can only be set in the
<filename>postgresql.conf</> file or on the server command line.
</para>
***************
*** 2105,2111 **** SET ENABLE_SEQSCAN TO OFF;
synchronous replication is enabled, individual transactions can be
configured not to wait for replication by setting the
<xref linkend="guc-synchronous-commit"> parameter to
! <literal>local</> or <literal>off</>.
</para>
<para>
This parameter can only be set in the <filename>postgresql.conf</>
--- 2105,2112 ----
synchronous replication is enabled, individual transactions can be
configured not to wait for replication by setting the
<xref linkend="guc-synchronous-commit"> parameter to
! <literal>local</> or <literal>off</>. This parameter has no effect on
! cascade replication.
</para>
<para>
This parameter can only be set in the <filename>postgresql.conf</>
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 877,884 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
--- 877,921 ----
network delay, or that the standby is under heavy load.
</para>
</sect3>
+ </sect2>
+
+ <sect2 id="cascade-replication">
+ <title>Cascade Replication</title>
+ <indexterm zone="high-availability">
+ <primary>Cascade Replication</primary>
+ </indexterm>
+ <para>
+ Cascade replication feature allows the standby to accept the replication
+ connections and stream WAL records to another standbys. This is useful
+ for reducing the number of standbys connecting to the master and reducing
+ the overhead of the master, when you have many standbys.
+ </para>
+ <para>
+ The cascading standby sends not only WAL records received from the
+ master but also those restored from the archive. So even if the replication
+ connection in higher level is terminated, you can continue cascade replication.
+ </para>
+ <para>
+ Cascade replication is asynchronous. Note that synchronous replication
+ (see <xref linkend="synchronous-replication">) has no effect on cascade
+ replication.
+ </para>
+ <para>
+ Promoting the cascading standby terminates all the cascade replication
+ connections which it uses. This is because the timeline becomes different
+ between standbys, and they cannot continue replication any more.
+ </para>
+ <para>
+ To use cascade replication, set up the cascading standby so that it can
+ accept the replication connections, i.e., set <varname>max_wal_senders</>,
+ <varname>hot_standby</> and authentication option (see
+ <xref linkend="streaming-replication"> and <xref linkend="hot-standby">).
+ Also set <varname>primary_conninfo</> in the cascaded standby to point
+ to the cascading standby.
+ </para>
</sect2>
+
<sect2 id="synchronous-replication">
<title>Synchronous Replication</title>
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 446,451 **** typedef struct XLogCtlData
--- 446,453 ----
XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* end of the last record restored from the archive */
+ XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */
bool recoveryPause;
***************
*** 618,625 **** static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
static bool AdvanceXLInsertBuffer(bool new_segment);
static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
! static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
! bool find_free, int *max_advance,
bool use_lock);
static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
int source, bool notexistOk);
--- 620,629 ----
static bool AdvanceXLInsertBuffer(bool new_segment);
static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
! static void XLogFileCopy(TimeLineID tli, uint32 log, uint32 seg, char *srcpath,
! uint32 offset);
! static bool InstallXLogFileSegment(TimeLineID tli, uint32 *log, uint32 *seg,
! char *tmppath, bool find_free, int *max_advance,
bool use_lock);
static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
int source, bool notexistOk);
***************
*** 1742,1748 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
/* create/use new log file */
use_existent = true;
! openLogFile = XLogFileInit(openLogId, openLogSeg,
&use_existent, true);
openLogOff = 0;
}
--- 1746,1752 ----
/* create/use new log file */
use_existent = true;
! openLogFile = XLogFileInit(ThisTimeLineID, openLogId, openLogSeg,
&use_existent, true);
openLogOff = 0;
}
***************
*** 2304,2310 **** XLogNeedsFlush(XLogRecPtr record)
/*
* Create a new XLOG file segment, or open a pre-existing one.
*
! * log, seg: identify segment to be created/opened.
*
* *use_existent: if TRUE, OK to use a pre-existing file (else, any
* pre-existing file will be deleted). On return, TRUE if a pre-existing
--- 2308,2314 ----
/*
* Create a new XLOG file segment, or open a pre-existing one.
*
! * tli, log, seg: identify segment to be created/opened.
*
* *use_existent: if TRUE, OK to use a pre-existing file (else, any
* pre-existing file will be deleted). On return, TRUE if a pre-existing
***************
*** 2322,2328 **** XLogNeedsFlush(XLogRecPtr record)
* in a critical section.
*/
int
! XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock)
{
char path[MAXPGPATH];
--- 2326,2332 ----
* in a critical section.
*/
int
! XLogFileInit(TimeLineID tli, uint32 log, uint32 seg,
bool *use_existent, bool use_lock)
{
char path[MAXPGPATH];
***************
*** 2334,2340 **** XLogFileInit(uint32 log, uint32 seg,
int fd;
int nbytes;
! XLogFilePath(path, ThisTimeLineID, log, seg);
/*
* Try to use existent file (checkpoint maker may have created it already)
--- 2338,2344 ----
int fd;
int nbytes;
! XLogFilePath(path, tli, log, seg);
/*
* Try to use existent file (checkpoint maker may have created it already)
***************
*** 2431,2437 **** XLogFileInit(uint32 log, uint32 seg,
installed_log = log;
installed_seg = seg;
max_advance = XLOGfileslop;
! if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
*use_existent, &max_advance,
use_lock))
{
--- 2435,2441 ----
installed_log = log;
installed_seg = seg;
max_advance = XLOGfileslop;
! if (!InstallXLogFileSegment(tli, &installed_log, &installed_seg, tmppath,
*use_existent, &max_advance,
use_lock))
{
***************
*** 2463,2564 **** XLogFileInit(uint32 log, uint32 seg,
/*
* Create a new XLOG file segment by copying a pre-existing one.
*
! * log, seg: identify segment to be created.
*
! * srcTLI, srclog, srcseg: identify segment to be copied (could be from
! * a different timeline)
*
! * Currently this is only used during recovery, and so there are no locking
! * considerations. But we should be just as tense as XLogFileInit to avoid
! * emplacing a bogus file.
*/
static void
! XLogFileCopy(uint32 log, uint32 seg,
! TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
{
- char path[MAXPGPATH];
- char tmppath[MAXPGPATH];
char buffer[XLOG_BLCKSZ];
! int srcfd;
! int fd;
! int nbytes;
/*
* Open the source file
*/
! XLogFilePath(path, srcTLI, srclog, srcseg);
! srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (srcfd < 0)
ereport(ERROR,
(errcode_for_file_access(),
! errmsg("could not open file \"%s\": %m", path)));
! /*
! * Copy into a temp file name.
! */
! snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
! unlink(tmppath);
! /* do not use get_sync_bit() here --- want to fsync only at end of fill */
! fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
! S_IRUSR | S_IWUSR);
! if (fd < 0)
! ereport(ERROR,
! (errcode_for_file_access(),
! errmsg("could not create file \"%s\": %m", tmppath)));
/*
* Do the data copying.
*/
! for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
{
! errno = 0;
! if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
! {
! if (errno != 0)
! ereport(ERROR,
! (errcode_for_file_access(),
! errmsg("could not read file \"%s\": %m", path)));
! else
! ereport(ERROR,
! (errmsg("not enough data in file \"%s\"", path)));
! }
! errno = 0;
! if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
! {
! int save_errno = errno;
! /*
! * If we fail to make the file, delete it to release disk space
! */
! unlink(tmppath);
! /* if write didn't set errno, assume problem is no disk space */
! errno = save_errno ? save_errno : ENOSPC;
ereport(ERROR,
(errcode_for_file_access(),
! errmsg("could not write to file \"%s\": %m", tmppath)));
}
}
! if (pg_fsync(fd) != 0)
! ereport(ERROR,
! (errcode_for_file_access(),
! errmsg("could not fsync file \"%s\": %m", tmppath)));
if (close(fd))
ereport(ERROR,
(errcode_for_file_access(),
! errmsg("could not close file \"%s\": %m", tmppath)));
close(srcfd);
-
- /*
- * Now move the segment into place with its final name.
- */
- if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
- elog(ERROR, "InstallXLogFileSegment should not have failed");
}
/*
--- 2467,2557 ----
/*
* Create a new XLOG file segment by copying a pre-existing one.
*
! * tli, log, seg: identify segment to be created.
*
! * srcpath: identify segment to be copied.
*
! * offset: identify offset to start copying from.
*/
static void
! XLogFileCopy(TimeLineID tli, uint32 log, uint32 seg, char *srcpath, uint32 offset)
{
char buffer[XLOG_BLCKSZ];
! int srcfd;
! int fd;
! bool use_existent;
/*
* Open the source file
*/
! srcfd = BasicOpenFile(srcpath, O_RDONLY | PG_BINARY, 0);
if (srcfd < 0)
ereport(ERROR,
(errcode_for_file_access(),
! errmsg("could not open file \"%s\": %m", srcpath)));
! /* Create/use new log file */
! use_existent = true;
! fd = XLogFileInit(tli, log, seg, &use_existent, true);
! /* Need to seek in the file? */
! if (offset != 0)
! {
! if (lseek(srcfd, (off_t) offset, SEEK_SET) < 0)
! ereport(ERROR,
! (errcode_for_file_access(),
! errmsg("could not seek in log file \"%s\": %m",
! srcpath)));
! if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
! ereport(ERROR,
! (errcode_for_file_access(),
! errmsg("could not seek in log file %u, segment %u to offset %u: %m",
! log, seg, offset)));
! }
/*
* Do the data copying.
*/
! while (offset < XLogSegSize)
{
! int nbytes;
! nbytes = read(srcfd, buffer, sizeof(buffer));
! if (nbytes <= 0)
! ereport(ERROR,
! (errcode_for_file_access(),
! errmsg("could not read from log file \"%s\": %m",
! srcpath)));
+ /* OK to write the logs */
+ errno = 0;
+ if (write(fd, buffer, nbytes) != nbytes)
+ {
+ /* if write didn't set errno, assume no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
ereport(ERROR,
(errcode_for_file_access(),
! errmsg("could not write to log file %u, segment %u "
! "at offset %u, length %lu: %m",
! log, seg, offset, (unsigned long) nbytes)));
}
+
+ /* Update state for copy */
+ offset += nbytes;
}
! /* Issue appropriate kind of fsync */
! issue_xlog_fsync(fd, log, seg);
if (close(fd))
ereport(ERROR,
(errcode_for_file_access(),
! errmsg("could not close log file %u, segment %u: %m",
! log, seg)));
close(srcfd);
}
/*
***************
*** 2567,2573 **** XLogFileCopy(uint32 log, uint32 seg,
* This is used both to install a newly-created segment (which has a temp
* filename while it's being created) and to recycle an old segment.
*
! * *log, *seg: identify segment to install as (or first possible target).
* When find_free is TRUE, these are modified on return to indicate the
* actual installation location or last segment searched.
*
--- 2560,2566 ----
* This is used both to install a newly-created segment (which has a temp
* filename while it's being created) and to recycle an old segment.
*
! * tli, *log, *seg: identify segment to install as (or first possible target).
* When find_free is TRUE, these are modified on return to indicate the
* actual installation location or last segment searched.
*
***************
*** 2591,2604 **** XLogFileCopy(uint32 log, uint32 seg,
* file into place.
*/
static bool
! InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance,
bool use_lock)
{
char path[MAXPGPATH];
struct stat stat_buf;
! XLogFilePath(path, ThisTimeLineID, *log, *seg);
/*
* We want to be sure that only one process does this at a time.
--- 2584,2597 ----
* file into place.
*/
static bool
! InstallXLogFileSegment(TimeLineID tli, uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance,
bool use_lock)
{
char path[MAXPGPATH];
struct stat stat_buf;
! XLogFilePath(path, tli, *log, *seg);
/*
* We want to be sure that only one process does this at a time.
***************
*** 2625,2631 **** InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
}
NextLogSeg(*log, *seg);
(*max_advance)--;
! XLogFilePath(path, ThisTimeLineID, *log, *seg);
}
}
--- 2618,2624 ----
}
NextLogSeg(*log, *seg);
(*max_advance)--;
! XLogFilePath(path, tli, *log, *seg);
}
}
***************
*** 2729,2734 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
--- 2722,2757 ----
elog(ERROR, "invalid XLogFileRead source %d", source);
}
+ /*
+ * If cascade replication is allowed, and we've just restored an archived
+ * WAL file to temporary file, we copy it to the WAL file with correct
+ * name, so that cascading walsenders can treat it.
+ */
+ if (source == XLOG_FROM_ARCHIVE && AllowCascadeReplication())
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr endptr;
+
+ XLogFileCopy(curFileTLI, log, seg, path, readOff);
+
+ /*
+ * Calculate the end location of the restored WAL file and save it in
+ * shmem. It's used as current standby flush position, and cascading
+ * walsenders try to send WAL records up to this location.
+ */
+ endptr.xlogid = log;
+ endptr.xrecoff = seg * XLogSegSize;
+ XLByteAdvance(endptr, XLogSegSize);
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->restoreLastRecPtr = endptr;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ /* Signal walsender that new WAL has arrived */
+ WalSndWakeup();
+ }
+
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (fd >= 0)
{
***************
*** 3255,3261 **** PreallocXlogFiles(XLogRecPtr endptr)
{
NextLogSeg(_logId, _logSeg);
use_existent = true;
! lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
close(lf);
if (!use_existent)
CheckpointStats.ckpt_segs_added++;
--- 3278,3284 ----
{
NextLogSeg(_logId, _logSeg);
use_existent = true;
! lf = XLogFileInit(ThisTimeLineID, _logId, _logSeg, &use_existent, true);
close(lf);
if (!use_existent)
CheckpointStats.ckpt_segs_added++;
***************
*** 3386,3392 **** RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
* separate archive directory.
*/
if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
! InstallXLogFileSegment(&endlogId, &endlogSeg, path,
true, &max_advance, true))
{
ereport(DEBUG2,
--- 3409,3415 ----
* separate archive directory.
*/
if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
! InstallXLogFileSegment(ThisTimeLineID, &endlogId, &endlogSeg, path,
true, &max_advance, true))
{
ereport(DEBUG2,
***************
*** 5153,5159 **** BootStrapXLOG(void)
/* Create first XLOG segment file */
use_existent = false;
! openLogFile = XLogFileInit(0, 1, &use_existent, false);
/* Write the first page with the initial record */
errno = 0;
--- 5176,5182 ----
/* Create first XLOG segment file */
use_existent = false;
! openLogFile = XLogFileInit(ThisTimeLineID, 0, 1, &use_existent, false);
/* Write the first page with the initial record */
errno = 0;
***************
*** 5532,5539 **** exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
*/
if (endTLI != ThisTimeLineID)
{
! XLogFileCopy(endLogId, endLogSeg,
! endTLI, endLogId, endLogSeg);
if (XLogArchivingActive())
{
--- 5555,5563 ----
*/
if (endTLI != ThisTimeLineID)
{
! XLogFilePath(xlogpath, endTLI, endLogId, endLogSeg);
! XLogFileCopy(ThisTimeLineID, endLogId, endLogSeg,
! xlogpath, 0);
if (XLogArchivingActive())
{
***************
*** 8162,8167 **** CreateRestartPoint(int flags)
--- 8186,8231 ----
/* Get the current (or recent) end of xlog */
endptr = GetWalRcvWriteRecPtr(NULL);
+ /*
+ * Calculate the last segment that we need to retain because of
+ * wal_keep_segments, by subtracting wal_keep_segments from
+ * current end of xlog.
+ */
+ if (wal_keep_segments > 0)
+ {
+ uint32 log;
+ uint32 seg;
+ int d_log;
+ int d_seg;
+
+ XLByteToSeg(endptr, log, seg);
+
+ d_seg = wal_keep_segments % XLogSegsPerFile;
+ d_log = wal_keep_segments / XLogSegsPerFile;
+ if (seg < d_seg)
+ {
+ d_log += 1;
+ seg = seg - d_seg + XLogSegsPerFile;
+ }
+ else
+ seg = seg - d_seg;
+ /* avoid underflow, don't go below (0,1) */
+ if (log < d_log || (log == d_log && seg == 0))
+ {
+ log = 0;
+ seg = 1;
+ }
+ else
+ log = log - d_log;
+
+ /* don't delete WAL segments newer than the calculated segment */
+ if (log < _logId || (log == _logId && seg < _logSeg))
+ {
+ _logId = log;
+ _logSeg = seg;
+ }
+ }
+
PrevLogSeg(_logId, _logSeg);
RemoveOldXlogFiles(_logId, _logSeg, endptr);
***************
*** 9549,9558 **** pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
/*
* Get latest redo apply position.
*
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
! GetXLogReplayRecPtr(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
--- 9613,9626 ----
/*
* Get latest redo apply position.
*
+ * Optionally, returns the end byte position of the last restored
+ * WAL segment. Callers not interested in that value may pass
+ * NULL for restoreLastRecPtr.
+ *
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
! GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
***************
*** 9560,9571 **** GetXLogReplayRecPtr(void)
--- 9628,9661 ----
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->recoveryLastRecPtr;
+ if (restoreLastRecPtr)
+ *restoreLastRecPtr = xlogctl->restoreLastRecPtr;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
}
/*
+ * Get current standby flush position, ie, the last WAL position
+ * known to be fsync'd to disk in standby.
+ */
+ XLogRecPtr
+ GetStandbyFlushRecPtr(void)
+ {
+ XLogRecPtr receivePtr;
+ XLogRecPtr replayPtr;
+ XLogRecPtr restorePtr;
+
+ receivePtr = GetWalRcvWriteRecPtr(NULL);
+ replayPtr = GetXLogReplayRecPtr(&restorePtr);
+
+ if (XLByteLT(receivePtr, replayPtr))
+ return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
+ else
+ return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr;
+ }
+
+ /*
* Report the last WAL replay location (same format as pg_start_backup etc)
*
* This is useful for determining how much of WAL is visible to read-only
***************
*** 9577,9583 **** pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr;
char location[MAXFNAMELEN];
! recptr = GetXLogReplayRecPtr();
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
--- 9667,9673 ----
XLogRecPtr recptr;
char location[MAXFNAMELEN];
! recptr = GetXLogReplayRecPtr(NULL);
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
***************
*** 10066,10071 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
--- 10156,10162 ----
}
XLByteToSeg(*RecPtr, readId, readSeg);
+ readOff = RecPtr->xrecoff % XLogSegSize;
retry:
/* See if we need to retrieve more data */
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 2304,2309 **** reaper(SIGNAL_ARGS)
--- 2304,2318 ----
pmState = PM_RUN;
/*
+ * Kill the cascading walsender to urge the cascaded standby to
+ * reread the timeline history file, adjust its timeline and
+ * establish replication connection again. This is required
+ * because the timeline of cascading standby is not consistent
+ * with that of cascaded one just after failover.
+ */
+ SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND);
+
+ /*
* Crank up the background writer, if we didn't do that already
* when we entered consistent recovery state. It doesn't matter
* if this fails, we'll just try again later.
*** a/src/backend/replication/basebackup.c
--- b/src/backend/replication/basebackup.c
***************
*** 339,344 **** SendBaseBackup(BaseBackupCmd *cmd)
--- 339,349 ----
MemoryContext old_context;
basebackup_options opt;
+ if (cascading_walsender)
+ ereport(FATAL,
+ (errcode(ERRCODE_CANNOT_CONNECT_NOW),
+ errmsg("recovery is still in progress, can't accept WAL streaming connections for backup")));
+
parse_basebackup_options(cmd->options, &opt);
backup_context = AllocSetContextCreate(CurrentMemoryContext,
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 469,474 **** SyncRepGetStandbyPriority(void)
--- 469,481 ----
int priority = 0;
bool found = false;
+ /*
+ * Since synchronous cascade replication is not allowed, we always
+ * set the priority of cascading walsender to zero.
+ */
+ if (cascading_walsender)
+ return 0;
+
/* Need a modifiable copy of string */
rawstring = pstrdup(SyncRepStandbyNames);
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 44,49 ****
--- 44,50 ----
#include "miscadmin.h"
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
+ #include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
***************
*** 485,491 **** XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* Create/use new log file */
XLByteToSeg(recptr, recvId, recvSeg);
use_existent = true;
! recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true);
recvOff = 0;
}
--- 486,492 ----
/* Create/use new log file */
XLByteToSeg(recptr, recvId, recvSeg);
use_existent = true;
! recvFile = XLogFileInit(ThisTimeLineID, recvId, recvSeg, &use_existent, true);
recvOff = 0;
}
***************
*** 564,571 **** XLogWalRcvFlush(bool dying)
}
SpinLockRelease(&walrcv->mutex);
! /* Signal the startup process that new WAL has arrived */
WakeupRecovery();
/* Report XLOG streaming progress in PS display */
if (update_process_title)
--- 565,574 ----
}
SpinLockRelease(&walrcv->mutex);
! /* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery();
+ if (AllowCascadeReplication())
+ WalSndWakeup();
/* Report XLOG streaming progress in PS display */
if (update_process_title)
***************
*** 625,631 **** XLogWalRcvSendReply(void)
/* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
! reply_message.apply = GetXLogReplayRecPtr();
reply_message.sendTime = now;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
--- 628,634 ----
/* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
! reply_message.apply = GetXLogReplayRecPtr(NULL);
reply_message.sendTime = now;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 48,53 ****
--- 48,54 ----
#include "replication/basebackup.h"
#include "replication/replnodes.h"
#include "replication/walprotocol.h"
+ #include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
***************
*** 70,75 **** WalSnd *MyWalSnd = NULL;
--- 71,77 ----
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
+ bool cascading_walsender = false; /* Am I cascading WAL to another standby ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
***************
*** 135,144 **** WalSenderMain(void)
{
MemoryContext walsnd_context;
! if (RecoveryInProgress())
! ereport(FATAL,
! (errcode(ERRCODE_CANNOT_CONNECT_NOW),
! errmsg("recovery is still in progress, can't accept WAL streaming connections")));
/* Create a per-walsender data structure in shared memory */
InitWalSnd();
--- 137,143 ----
{
MemoryContext walsnd_context;
! cascading_walsender = RecoveryInProgress();
/* Create a per-walsender data structure in shared memory */
InitWalSnd();
***************
*** 165,170 **** WalSenderMain(void)
--- 164,175 ----
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
+ /*
+ * Use the recovery target timeline ID during recovery
+ */
+ if (cascading_walsender)
+ ThisTimeLineID = GetRecoveryTargetTLI();
+
/* Tell the standby that walsender is ready for receiving commands */
ReadyForQuery(DestRemote);
***************
*** 290,296 **** IdentifySystem(void)
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
! logptr = GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff);
--- 295,301 ----
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
! logptr = cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff);
***************
*** 372,379 **** StartReplication(StartReplicationCmd *cmd)
* directory that was created with 'minimal'. So this is not bulletproof,
* the purpose is just to give a user-friendly error message that hints
* how to configure the system correctly.
*/
! if (wal_level == WAL_LEVEL_MINIMAL)
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
--- 377,388 ----
* directory that was created with 'minimal'. So this is not bulletproof,
* the purpose is just to give a user-friendly error message that hints
* how to configure the system correctly.
+ *
+ * NOTE: The existence of cascading walsender means that wal_level is set
+ * to hot_standby in the master. So we don't need to check the value of
+ * wal_level during recovery.
*/
! if (!cascading_walsender && wal_level == WAL_LEVEL_MINIMAL)
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
***************
*** 601,607 **** ProcessStandbyReplyMessage(void)
SpinLockRelease(&walsnd->mutex);
}
! SyncRepReleaseWaiters();
}
/*
--- 610,617 ----
SpinLockRelease(&walsnd->mutex);
}
! if (!cascading_walsender)
! SyncRepReleaseWaiters();
}
/*
***************
*** 1079,1085 **** XLogSend(char *msgbuf, bool *caughtup)
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/
! SendRqstPtr = GetFlushRecPtr();
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
--- 1089,1095 ----
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/
! SendRqstPtr = cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 221,226 **** extern int wal_level;
--- 221,229 ----
/* Do we need to WAL-log information required only for Hot Standby? */
#define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
+ /* Can we allow the standby to accept replication connection from another standby? */
+ #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
+
#ifdef WAL_DEBUG
extern bool XLOG_DEBUG;
#endif
***************
*** 273,279 **** extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern void XLogFlush(XLogRecPtr RecPtr);
extern void XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
! extern int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
extern int XLogFileOpen(uint32 log, uint32 seg);
--- 276,282 ----
extern void XLogFlush(XLogRecPtr RecPtr);
extern void XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
! extern int XLogFileInit(TimeLineID tli, uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
extern int XLogFileOpen(uint32 log, uint32 seg);
***************
*** 292,298 **** extern bool RecoveryInProgress(void);
extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
! extern XLogRecPtr GetXLogReplayRecPtr(void);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);
--- 295,302 ----
extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
! extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
! extern XLogRecPtr GetStandbyFlushRecPtr(void);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 92,97 **** extern WalSndCtlData *WalSndCtl;
--- 92,98 ----
/* global state */
extern bool am_walsender;
+ extern bool cascading_walsender;
extern volatile sig_atomic_t walsender_shutdown_requested;
extern volatile sig_atomic_t walsender_ready_to_stop;
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers