On 05/08/2015 04:21 PM, Heikki Linnakangas wrote:
On 04/22/2015 10:07 AM, Michael Paquier wrote:
On Wed, Apr 22, 2015 at 3:38 PM, Heikki Linnakangas <hlinn...@iki.fi> wrote:
I feel that the best approach is to archive the last, partial segment, but
with the .partial suffix. I don't see any plausible real-wold setup where
the current behavior would be better. I don't really see much need to
archive the partial segment at all, but there's also no harm in doing it, as
long as it's clearly marked with the .partial suffix.

Well, as long as it is clearly archived at promotion, even with a
suffix, I guess that I am fine... This will need some tweaking on
restore_command for existing applications, but as long as it is
clearly documented I am fine. Shouldn't this be a different patch
though?

Ok, I came up with the attached, which adds the .partial suffix to the
partial WAL segment that's archived after promotion. I couldn't find any
natural place to talk about it in the docs, though. I think after the
docs changes from the main patch are applied, it would be natural to
mention this in the "Continuous archiving in standby", so I think I'll
add that later.

Barring objections, I'll push this later tonight.

Applied that part.

Now that we got this last-partial-segment problem out of the way, I'm
going to try fixing the problem you (Michael) pointed out about relying
on pgstat file. Meanwhile, I'd love to get more feedback on the rest of
the patch, and the documentation.

And here is a new version of the patch. I kept the approach of using pgstat, but it now only polls pgstat every 10 seconds, and doesn't block to wait for updated stats.

- Heikki

From 08ca3cc7b9824503b793e149247ea9c6d3a7f323 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 16 Apr 2015 14:40:24 +0300
Subject: [PATCH v3 1/1] Make WAL archival behave more sensibly in standby
 mode.

This adds two new archive_modes, 'shared' and 'always', to indicate whether
the WAL archive is shared between the primary and standby, or not. In
shared mode, the standby tracks which files have been archived by the
primary. The standby refrains from recycling files that the primary has not
yet archived, and at failover, the standby archives all those files too
from the old timeline. In 'always' mode, the standby's WAL archive is taken
to be separate from the primary's, and the standby independently archives
all files it receives from the primary.

This adds a new "archival status" message to the protocol. WAL sender sends
one automatically, when the last archived WAL file, as reported in pgstat,
changes. (Or rather, some time after it changes. We're not in a hurry, the
standby doesn't need an up-to-the-second status)

Fujii Masao and me.
---
 doc/src/sgml/config.sgml                      |  12 +-
 doc/src/sgml/high-availability.sgml           |  48 +++++++
 doc/src/sgml/protocol.sgml                    |  31 +++++
 src/backend/access/transam/xlog.c             |  29 +++-
 src/backend/postmaster/pgstat.c               |  44 ++++++
 src/backend/postmaster/postmaster.c           |  37 +++--
 src/backend/replication/walreceiver.c         | 172 +++++++++++++++++++-----
 src/backend/replication/walsender.c           | 186 ++++++++++++++++++++++----
 src/backend/utils/misc/guc.c                  |  21 +--
 src/backend/utils/misc/postgresql.conf.sample |   2 +-
 src/include/access/xlog.h                     |  14 +-
 src/include/pgstat.h                          |   2 +
 12 files changed, 513 insertions(+), 85 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 0d8624a..ac845e0 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2521,7 +2521,7 @@ include_dir 'conf.d'
 
     <variablelist>
      <varlistentry id="guc-archive-mode" xreflabel="archive_mode">
-      <term><varname>archive_mode</varname> (<type>boolean</type>)
+      <term><varname>archive_mode</varname> (<type>enum</type>)
       <indexterm>
        <primary><varname>archive_mode</> configuration parameter</primary>
       </indexterm>
@@ -2530,7 +2530,15 @@ include_dir 'conf.d'
        <para>
         When <varname>archive_mode</> is enabled, completed WAL segments
         are sent to archive storage by setting
-        <xref linkend="guc-archive-command">.
+        <xref linkend="guc-archive-command">. In addition to <literal>off</>,
+        to disable, there are three modes: <literal>on</>, <literal>shared</>,
+        and <literal>always</>. During normal operation, there is no
+        difference between the three modes, but in archive recovery or
+        standby mode, it indicates whether the WAL archive is shared between
+        the primary and the standby server or not. See
+        <xref linkend="continuous-archiving-in-standby"> for details.
+       </para>  
+       <para>
         <varname>archive_mode</> and <varname>archive_command</> are
         separate variables so that <varname>archive_command</> can be
         changed without leaving archiving mode.
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index a17f555..62f7c75 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -1220,6 +1220,54 @@ primary_slot_name = 'node_a_slot'
 
    </sect3>
   </sect2>
+
+  <sect2 id="continuous-archiving-in-standby">
+   <title>Continuous archiving in standby</title>
+
+   <indexterm>
+     <primary>continuous archiving</primary>
+     <secondary>in standby</secondary>
+   </indexterm>
+
+   <para>
+     When continuous WAL archiving is used in a standby, there are two
+     different scenarios: the WAL archive can be shared between the primary
+     and the standby, or the standby can have its own WAL archive. In the
+     shared archive scenario, <varname>archive_mode</varname> must be set to
+     <literal>shared</literal>, and in the separate archive scenario, to
+     <literal>always</literal>. Setting it to <literal>on</literal> in a
+     standby server, or when performing point-in-time recovery, is not
+     allowed and an error will be raised. When a server is not in recovery
+     mode, there is no difference between <literal>on</literal>,
+     <literal>shared</literal>, and <literal>always</literal> modes.
+   </para>
+
+   <para>
+     In <literal>shared</literal> archive mode, the standby server tries to
+     ensure that the archive is complete, even if the primary crashes and
+     failover happens. The standby server will not archive any WAL segments
+     as long as it is in standby mode; it is the primary server's
+     responsibility to do so. It will, however, keep track of which files
+     have already been archived by the primary, and if failover happens, it
+     takes over and attempts to archive any files that the primary had not
+     yet archived.
+   </para>
+
+   <para>
+     In <literal>always</literal> archive mode, the standby server will
+     archive all WAL it receives, whether it's through streaming replication
+     or by restoring from the primary's archive using
+     <varname>restore_command</varname>.
+   </para>
+
+   <para>
+     In cascading replication, the first standby server and the cascaded
+     standby servers can use <varname>archive_mode</varname> settings. In
+     each standby, it should be set to <literal>shared</literal> or
+     <literal>always</literal>, depending on whether that standby shares the
+     archive with the primary or standby it is connected to.
+   </para>
+  </sect2>
   </sect1>
 
   <sect1 id="warm-standby-failover">
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index ac13d32..bd2dd3f 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1646,6 +1646,37 @@ The commands accepted in walsender mode are:
       </para>
       </listitem>
       </varlistentry>
+      <varlistentry>
+      <term>
+          WAL archival report message (B)
+      </term>
+      <listitem>
+      <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+          Byte1('a')
+      </term>
+      <listitem>
+      <para>
+          Tells the receiver the last archived WAL segment.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Byte<replaceable>n</replaceable>
+      </term>
+      <listitem>
+      <para>
+          Filename of the latest archived file.
+      </para>
+      </listitem>
+      </varlistentry>
+      </variablelist>
+      </para>
+      </listitem>
+      </varlistentry>
       </variablelist>
      </para>
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6f7e3bd9..ee5a4a1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -85,7 +85,7 @@ int			min_wal_size = 5;		/* 80 MB */
 int			wal_keep_segments = 0;
 int			XLOGbuffers = -1;
 int			XLogArchiveTimeout = 0;
-bool		XLogArchiveMode = false;
+int			XLogArchiveMode = ARCHIVE_MODE_OFF;
 char	   *XLogArchiveCommand = NULL;
 bool		EnableHotStandby = false;
 bool		fullPageWrites = true;
@@ -139,6 +139,25 @@ const struct config_enum_entry sync_method_options[] = {
 	{NULL, 0, false}
 };
 
+
+/*
+ * Although only "on", "off", and "always" are documented,
+ * we accept all the likely variants of "on" and "off".
+ */
+const struct config_enum_entry archive_mode_options[] = {
+	{"shared", ARCHIVE_MODE_SHARED, false},
+	{"always", ARCHIVE_MODE_ALWAYS, false},
+	{"on", ARCHIVE_MODE_ON, false},
+	{"off", ARCHIVE_MODE_OFF, false},
+	{"true", ARCHIVE_MODE_ON, true},
+	{"false", ARCHIVE_MODE_OFF, true},
+	{"yes", ARCHIVE_MODE_ON, true},
+	{"no", ARCHIVE_MODE_OFF, true},
+	{"1", ARCHIVE_MODE_ON, true},
+	{"0", ARCHIVE_MODE_OFF, true},
+	{NULL, 0, false}
+};
+
 /*
  * Statistics for current checkpoint are collected in this global struct.
  * Because only the checkpointer or a stand-alone backend can perform
@@ -766,7 +785,7 @@ static MemoryContext walDebugCxt = NULL;
 #endif
 
 static void readRecoveryCommandFile(void);
-static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
+static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog);
 static bool recoveryStopsBefore(XLogReaderState *record);
 static bool recoveryStopsAfter(XLogReaderState *record);
 static void recoveryPausesHere(void);
@@ -6037,6 +6056,12 @@ StartupXLOG(void)
 
 	if (ArchiveRecoveryRequested)
 	{
+		/* archive_mode=on is not allowed during archive recovery. */
+		if (XLogArchiveMode == ARCHIVE_MODE_ON)
+			ereport(ERROR,
+					(errmsg("archive_mode='on' cannot be used in archive recovery"),
+					 (errhint("Use 'shared' or 'always' mode instead."))));
+
 		if (StandbyModeRequested)
 			ereport(LOG,
 					(errmsg("entering standby mode")));
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 1e6073a..13bee0c 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -2471,6 +2471,50 @@ pgstat_fetch_global(void)
 }
 
 
+/*
+ * ---------
+ * pgstat_use_stale_snapshot() -
+ *
+ *	Take a "snapshot" of the current stats into backend-private memory.
+ *	pgstat_fetch_*() functions can then be used to interrogate the stats.
+ *
+ *	The first call pgstat_fetch_*() in a transaction will take a snapshot
+ *	implicitly, so this is normally not required. But this can be used if
+ *	you don't want to wait for fresh stats, like pgstat_fetch_*() functions
+ *	will
+ * ---------
+ */
+void
+pgstat_use_stale_snapshot(void)
+{
+	pgstat_clear_snapshot();
+
+	/*
+	 * For all the current callers, shallow stats are enough.
+	 *
+	 * XXX: There is no way to request global stats only; we'll get stats
+	 * for all databases.
+	 */
+	pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
+}
+
+/*
+ * ---------
+ * pgstat_request_update() -
+ *
+ *	Ask the stats collector to refresh the stats file. Normally,
+ *	pgstat_fetch_*() will do this automatically, but this can be used together
+ *	with pgstat_take_snapshot() to wait for poll for updated stats
+ *	asynchronously.
+ * ---------
+ */
+void
+pgstat_request_update(TimestampTz cur_ts, TimestampTz min_ts)
+{
+	pgstat_send_inquiry(cur_ts, min_ts, InvalidOid);
+}
+
+
 /* ------------------------------------------------------------
  * Functions for management of the shared-memory PgBackendStatus array
  * ------------------------------------------------------------
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a9f20ac..72fe4fd 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -828,9 +828,9 @@ PostmasterMain(int argc, char *argv[])
 		write_stderr("%s: max_wal_senders must be less than max_connections\n", progname);
 		ExitPostmaster(1);
 	}
-	if (XLogArchiveMode && wal_level == WAL_LEVEL_MINIMAL)
+	if (XLogArchiveMode > ARCHIVE_MODE_OFF && wal_level == WAL_LEVEL_MINIMAL)
 		ereport(ERROR,
-				(errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\", \"hot_standby\", or \"logical\"")));
+				(errmsg("WAL archival (archive_mode=on/always/shared) requires wal_level \"archive\", \"hot_standby\", or \"logical\"")));
 	if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL)
 		ereport(ERROR,
 				(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\", or \"logical\"")));
@@ -1645,13 +1645,21 @@ ServerLoop(void)
 				start_autovac_launcher = false; /* signal processed */
 		}
 
-		/* If we have lost the archiver, try to start a new one */
-		if (XLogArchivingActive() && PgArchPID == 0 && pmState == PM_RUN)
-			PgArchPID = pgarch_start();
-
-		/* If we have lost the stats collector, try to start a new one */
-		if (PgStatPID == 0 && pmState == PM_RUN)
-			PgStatPID = pgstat_start();
+		/*
+		 * If we have lost the archiver, try to start a new one.
+		 *
+		 * If WAL archiving is enabled always, we try to start a new archiver
+		 * even during recovery.
+		 */
+		if (PgArchPID == 0 && wal_level >= WAL_LEVEL_ARCHIVE)
+		{
+			if ((pmState == PM_RUN && XLogArchiveMode > ARCHIVE_MODE_OFF) ||
+				((pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY) &&
+				 XLogArchiveMode == ARCHIVE_MODE_ALWAYS))
+			{
+				PgArchPID = pgarch_start();
+			}
+		}
 
 		/* If we need to signal the autovacuum launcher, do so now */
 		if (avlauncher_needs_signal)
@@ -4807,6 +4815,17 @@ sigusr1_handler(SIGNAL_ARGS)
 		Assert(BgWriterPID == 0);
 		BgWriterPID = StartBackgroundWriter();
 
+		/*
+		 * Start the archiver if we're responsible for (re-)archiving received
+		 * files.
+		 */
+		Assert(PgArchPID == 0);
+		if (wal_level >= WAL_LEVEL_ARCHIVE &&
+			XLogArchiveMode == ARCHIVE_MODE_ALWAYS)
+		{
+			PgArchPID = pgarch_start();
+		}
+
 		pmState = PM_RECOVERY;
 	}
 	if (CheckPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY) &&
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 9c7710f..e53ffeb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -52,8 +52,11 @@
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/pgarch.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
 #include "storage/procarray.h"
@@ -107,6 +110,9 @@ static struct
 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
 }	LogstreamResult;
 
+/* */
+static char primary_last_archived[MAX_XFN_CHARS + 1];
+
 static StringInfoData reply_message;
 static StringInfoData incoming_message;
 
@@ -141,6 +147,7 @@ static void XLogWalRcvFlush(bool dying);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void ProcessArchivalReport(void);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -526,21 +533,12 @@ WalReceiverMain(void)
 		 */
 		if (recvFile >= 0)
 		{
-			char		xlogfname[MAXFNAMELEN];
-
 			XLogWalRcvFlush(false);
 			if (close(recvFile) != 0)
 				ereport(PANIC,
 						(errcode_for_file_access(),
 						 errmsg("could not close log segment %s: %m",
 								XLogFileNameP(recvFileTLI, recvSegNo))));
-
-			/*
-			 * Create .done file forcibly to prevent the streamed segment from
-			 * being archived later.
-			 */
-			XLogFileName(xlogfname, recvFileTLI, recvSegNo);
-			XLogArchiveForceDone(xlogfname);
 		}
 		recvFile = -1;
 
@@ -846,6 +844,26 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 					XLogWalRcvSendReply(true, false);
 				break;
 			}
+		case 'a':				/* Archival report */
+			{
+				/* the content of the message is a filename */
+				if (len >= sizeof(primary_last_archived))
+					ereport(ERROR,
+							(errcode(ERRCODE_PROTOCOL_VIOLATION),
+							 errmsg_internal("invalid archival report message with length %d",
+											 (int) len)));
+				memcpy(primary_last_archived, buf, len);
+				primary_last_archived[len] = '\0';
+				if (strspn(buf, VALID_XFN_CHARS) != len)
+				{
+					primary_last_archived[0] = '\0';
+					ereport(ERROR,
+							(errcode(ERRCODE_PROTOCOL_VIOLATION),
+							 errmsg_internal("unexpected character in primary's last archived filename")));
+				}
+				ProcessArchivalReport();
+				break;
+			}
 		default:
 			ereport(ERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -867,39 +885,18 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 	{
 		int			segbytes;
 
-		if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
+		if (!XLByteInSeg(recptr, recvSegNo))
 		{
 			bool		use_existent;
 
 			/*
-			 * fsync() and close current file before we switch to next one. We
-			 * would otherwise have to reopen this file to fsync it later
+			 * We take care to always close the current file, after writing
+			 * the last byte to it. So this shouldn't happen.
 			 */
 			if (recvFile >= 0)
-			{
-				char		xlogfname[MAXFNAMELEN];
-
-				XLogWalRcvFlush(false);
-
-				/*
-				 * XLOG segment files will be re-read by recovery in startup
-				 * process soon, so we don't advise the OS to release cache
-				 * pages associated with the file like XLogFileClose() does.
-				 */
-				if (close(recvFile) != 0)
-					ereport(PANIC,
-							(errcode_for_file_access(),
-							 errmsg("could not close log segment %s: %m",
-									XLogFileNameP(recvFileTLI, recvSegNo))));
-
-				/*
-				 * Create .done file forcibly to prevent the streamed segment
-				 * from being archived later.
-				 */
-				XLogFileName(xlogfname, recvFileTLI, recvSegNo);
-				XLogArchiveForceDone(xlogfname);
-			}
-			recvFile = -1;
+				ereport(ERROR,
+						(errmsg("unexpected WAL receive location %s",
+								XLogFileNameP(recvFileTLI, recvSegNo))));
 
 			/* Create/use new log file */
 			XLByteToSeg(recptr, recvSegNo);
@@ -954,6 +951,51 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 		buf += byteswritten;
 
 		LogstreamResult.Write = recptr;
+
+		/*
+		 * If we just wrote the last byte to this segment, fsync() and close
+		 * current file before we switch to next one. We would otherwise have
+		 * to reopen this file to fsync it later.
+		 */
+		if (recvOff == XLOG_SEG_SIZE)
+		{
+			char		xlogfname[MAXFNAMELEN];
+
+			XLogWalRcvFlush(false);
+
+			/*
+			 * XLOG segment files will be re-read by recovery in startup
+			 * process soon, so we don't advise the OS to release cache
+			 * pages associated with the file like XLogFileClose() does.
+			 */
+			if (close(recvFile) != 0)
+				ereport(PANIC,
+						(errcode_for_file_access(),
+						 errmsg("could not close log segment %s: %m",
+								XLogFileNameP(recvFileTLI, recvSegNo))));
+			recvFile = -1;
+
+			/*
+			 * Now that this segment is complete, do we need to archive it?
+			 *
+			 * In 'always' mode, we clearly need to archive this.
+			 *
+			 * In 'shared' mode, we might need to, if we get promoted before
+			 * the master has archived this file, so create a .ready file. It
+			 * will be replaced with .done later, if we get acknowledgemet
+			 * from the primary that this has already been archived.
+			 *
+			 * In 'on' mode, we're only responsible for WAL we've generated
+			 * ourselves.
+			 */
+			if (XLogArchiveMode == ARCHIVE_MODE_ALWAYS ||
+				XLogArchiveMode == ARCHIVE_MODE_SHARED)
+			{
+				XLogFileName(xlogfname, recvFileTLI, recvSegNo);
+
+				XLogArchiveCheckDone(xlogfname);
+			}
+		}
 	}
 }
 
@@ -1215,3 +1257,61 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 		pfree(receipttime);
 	}
 }
+
+/*
+ * Create .done and .ready files, based on the master's last archival report.
+ */
+static void
+ProcessArchivalReport(void)
+{
+	DIR		   *xldir;
+	struct dirent *xlde;
+
+	elog(DEBUG2, "received archival report from master: %s",
+		 primary_last_archived);
+
+	if (XLogArchiveMode != ARCHIVE_MODE_SHARED)
+		return;
+
+	/* Check that the filename the primary reported looks valid */
+	if (strlen(primary_last_archived) < 24 ||
+		strspn(primary_last_archived, "0123456789ABCDEF") != 24)
+		return;
+
+	xldir = AllocateDir(XLOGDIR);
+	if (xldir == NULL)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open transaction log directory \"%s\": %m",
+						XLOGDIR)));
+
+	while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+	{
+		/*
+		 * We ignore the timeline part of the XLOG segment identifiers in
+		 * deciding whether a segment is still needed.  This ensures that we
+		 * won't prematurely remove a segment from a parent timeline. We could
+		 * probably be a little more proactive about removing segments of
+		 * non-parent timelines, but that would be a whole lot more
+		 * complicated.
+		 *
+		 * We use the alphanumeric sorting property of the filenames to decide
+		 * which ones are earlier than the lastoff segment.
+		 */
+		if (strlen(xlde->d_name) == 24 &&
+			strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
+			strcmp(xlde->d_name + 8, primary_last_archived + 8) <= 0)
+		{
+			XLogArchiveForceDone(xlde->d_name);
+		}
+	}
+
+	FreeDir(xldir);
+
+	/*
+	 * Remember this location in pgstat as well. This makes it visible in
+	 * pg_stat_archiver, and allows the location to be relayed to cascaded
+	 * standbys.
+	 */
+	pgstat_send_archiver(primary_last_archived, false);
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4a20569..b4d4a90 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -55,6 +55,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "nodes/replnodes.h"
+#include "pgstat.h"
 #include "replication/basebackup.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
@@ -91,6 +92,21 @@
  */
 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
 
+/*
+ * How often to report the last archived WAL file to the client?
+ */
+#define 	ARCHIVAL_REPORT_INTERVAL	10000
+/*
+ * After requesting the stats collector for fresh stats, how often to poll
+ * for the result?
+ *
+ * This is similar to PGSTAT_RETRY_DELAY and PGSTAT_INQ_INTERVAL, but we're
+ * much more relaxed in WAL sender, as we're not in any rush to get the latest
+ * status to the client. We also just use a single value, and send a new
+ * request after each poll.
+ */
+#define 	ARCHIVAL_REQUEST_INTERVAL	1000
+
 /* Array of WalSnds in shared memory */
 WalSndCtlData *WalSndCtl = NULL;
 
@@ -153,6 +169,19 @@ static StringInfoData tmpbuf;
  */
 static TimestampTz last_reply_timestamp = 0;
 
+/*
+ * Last file archived. This is updated from pgstats, last update was at
+ * last_archival_report_timestamp.
+ */
+static char last_archived_file[MAX_XFN_CHARS + 1] = "";
+static TimestampTz last_archival_report_timestamp = 0;
+
+/*
+ * Have we requested fresh stats from the stats collector? And when?
+ */
+static bool	archival_status_requested = false;
+static TimestampTz last_archival_request_timestamp = 0;
+
 /* Have we sent a heartbeat message asking for reply, since last reply? */
 static bool waiting_for_ping_response = false;
 
@@ -209,6 +238,8 @@ static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
 static void WalSndKeepalive(bool requestReply);
 static void WalSndKeepaliveIfNecessary(TimestampTz now);
+static void WalSndArchivalReport(void);
+static void WalSndArchivalReportIfNecessary(TimestampTz now);
 static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
@@ -1693,46 +1724,72 @@ ProcessStandbyHSFeedbackMessage(void)
 
 /*
  * Compute how long send/receive loops should sleep.
- *
- * If wal_sender_timeout is enabled we want to wake up in time to send
- * keepalives and to abort the connection if wal_sender_timeout has been
- * reached.
  */
 static long
 WalSndComputeSleeptime(TimestampTz now)
 {
-	long		sleeptime = 10000;		/* 10 s */
+	TimestampTz wakeup_time;
+	long		sleeptime;
+	long		sec_to_timeout;
+	int			microsec_to_timeout;
+	TimestampTz w;
 
+	/*
+	 * If we have no other reason to wake up, wake up every 10 seconds,
+	 * just in case we miss something.
+	 */
+	wakeup_time = TimestampTzPlusMilliseconds(now, 10000);
+
+	/*
+	 * If wal_sender_timeout is enabled we want to wake up in time to send
+	 * keepalives and to abort the connection if wal_sender_timeout has been
+	 * reached.
+	 */
 	if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
 	{
-		TimestampTz wakeup_time;
-		long		sec_to_timeout;
-		int			microsec_to_timeout;
-
 		/*
 		 * At the latest stop sleeping once wal_sender_timeout has been
 		 * reached.
-		 */
-		wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-												  wal_sender_timeout);
-
-		/*
+		 *
 		 * If no ping has been sent yet, wakeup when it's time to do so.
 		 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
 		 * the timeout passed without a response.
 		 */
-		if (!waiting_for_ping_response)
-			wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
-													  wal_sender_timeout / 2);
-
-		/* Compute relative time until wakeup. */
-		TimestampDifference(now, wakeup_time,
-							&sec_to_timeout, &microsec_to_timeout);
+		if (waiting_for_ping_response)
+			w = TimestampTzPlusMilliseconds(last_reply_timestamp,
+											wal_sender_timeout);
+		else
+			w = TimestampTzPlusMilliseconds(last_reply_timestamp,
+											wal_sender_timeout / 2);
+		if (w < wakeup_time)
+			wakeup_time = w;
+	}
 
-		sleeptime = sec_to_timeout * 1000 +
-			microsec_to_timeout / 1000;
+	/* If archiving is enabled, send a status report to the client */
+	if (XLogArchivingActive())
+	{
+		/*
+		 * If we requested an update from pgstat, poll every
+		 * ARCHIVE_REQUEST_INTERVAL for the result. Otherwise wait until it's
+		 * time to send a new report.
+		 */
+		if (archival_status_requested)
+			w = TimestampTzPlusMilliseconds(last_archival_request_timestamp,
+											ARCHIVAL_REQUEST_INTERVAL);
+		else
+			w = TimestampTzPlusMilliseconds(last_archival_report_timestamp,
+											ARCHIVAL_REPORT_INTERVAL);
+		if (w < wakeup_time)
+			wakeup_time = w;
 	}
 
+	/* Compute relative time until wakeup. */
+	TimestampDifference(now, wakeup_time,
+						&sec_to_timeout, &microsec_to_timeout);
+
+	sleeptime = sec_to_timeout * 1000 +
+		microsec_to_timeout / 1000;
+
 	return sleeptime;
 }
 
@@ -2879,6 +2936,11 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	TimestampTz ping_time;
 
 	/*
+	 * Send an archival status message, if necessary.
+	 */
+	WalSndArchivalReportIfNecessary(now);
+
+	/*
 	 * Don't send keepalive messages if timeouts are globally disabled or
 	 * we're doing something not partaking in timeouts.
 	 */
@@ -2907,6 +2969,84 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 }
 
 /*
+ * This function is used to send archival report message to standby.
+ */
+static void
+WalSndArchivalReport(void)
+{
+	elog(LOG, "sending archival report: %s", last_archived_file);
+
+	/* construct the message... */
+	resetStringInfo(&output_message);
+	pq_sendbyte(&output_message, 'a');
+	pq_sendbytes(&output_message, last_archived_file, strlen(last_archived_file));
+
+	/* ... and send it wrapped in CopyData */
+	pq_putmessage_noblock('d', output_message.data, output_message.len);
+}
+
+static void
+WalSndArchivalReportIfNecessary(TimestampTz now)
+{
+	TimestampTz report_time;
+
+	/*
+	 * If we had already asked pgstat for an update, wait until it's had
+	 * some time to update the stats file before we retry.
+	 */
+	if (archival_status_requested)
+	{
+		TimestampTz next_retry;
+
+		next_retry =
+			TimestampTzPlusMilliseconds(last_archival_request_timestamp,
+										ARCHIVAL_REQUEST_INTERVAL);
+		if (now < next_retry)
+			return;
+	}
+
+	/*
+	 * If more than ARCHIVAL_REPORT_INTERVAL has elapsed since we got the
+	 * archival status from pgstat, poll.
+	 */
+	report_time = TimestampTzPlusMilliseconds(last_archival_report_timestamp,
+											  ARCHIVAL_REPORT_INTERVAL);
+	if (now >= report_time)
+	{
+		PgStat_ArchiverStats *archiver_stats;
+		PgStat_GlobalStats *global_stats;
+		TimestampTz min_ts;
+
+		pgstat_use_stale_snapshot();
+		archiver_stats = pgstat_fetch_stat_archiver();
+		global_stats = pgstat_fetch_global();
+
+		last_archival_report_timestamp = global_stats->stats_timestamp;
+
+		if (strcmp(last_archived_file, archiver_stats->last_archived_wal) != 0)
+		{
+			strlcpy(last_archived_file, archiver_stats->last_archived_wal,
+					sizeof(last_archived_file));
+			WalSndArchivalReport();
+		}
+
+		/* If this wasn't fresh enough, request an update */
+		min_ts = TimestampTzPlusMilliseconds(now, -ARCHIVAL_REPORT_INTERVAL);
+		if (last_archival_report_timestamp > min_ts)
+			archival_status_requested = false;
+		else
+		{
+			/* Not fresh enough. Request an update */
+			pgstat_request_update(now, min_ts);
+			last_archival_request_timestamp = now;
+			archival_status_requested = true;
+		}
+
+		pgstat_clear_snapshot();
+	}
+}
+
+/*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the
  * future for synchronous replication.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 5f71ded..97aca46 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -396,6 +396,7 @@ static const struct config_enum_entry row_security_options[] = {
  * Options for enum values stored in other modules
  */
 extern const struct config_enum_entry wal_level_options[];
+extern const struct config_enum_entry archive_mode_options[];
 extern const struct config_enum_entry sync_method_options[];
 extern const struct config_enum_entry dynamic_shared_memory_options[];
 
@@ -1530,16 +1531,6 @@ static struct config_bool ConfigureNamesBool[] =
 	},
 
 	{
-		{"archive_mode", PGC_POSTMASTER, WAL_ARCHIVING,
-			gettext_noop("Allows archiving of WAL files using archive_command."),
-			NULL
-		},
-		&XLogArchiveMode,
-		false,
-		NULL, NULL, NULL
-	},
-
-	{
 		{"hot_standby", PGC_POSTMASTER, REPLICATION_STANDBY,
 			gettext_noop("Allows connections and queries during recovery."),
 			NULL
@@ -3552,6 +3543,16 @@ static struct config_enum ConfigureNamesEnum[] =
 	},
 
 	{
+		{"archive_mode", PGC_POSTMASTER, WAL_ARCHIVING,
+			gettext_noop("Allows archiving of WAL files using archive_command."),
+			NULL
+		},
+		&XLogArchiveMode,
+		ARCHIVE_MODE_OFF, archive_mode_options,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"trace_recovery_messages", PGC_SIGHUP, DEVELOPER_OPTIONS,
 			gettext_noop("Enables logging of recovery-related debugging information."),
 			gettext_noop("Each level includes all the levels that follow it. The later"
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 110983f..90371d7 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -206,7 +206,7 @@
 
 # - Archiving -
 
-#archive_mode = off		# allows archiving to be done
+#archive_mode = off		# allows archiving to be done; off, on, shared, or always
 				# (change requires restart)
 #archive_command = ''		# command to use to archive a logfile segment
 				# placeholders: %p = path of file to archive
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index f08b676..8556bb8 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -96,7 +96,6 @@ extern int	wal_keep_segments;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
-extern bool XLogArchiveMode;
 extern char *XLogArchiveCommand;
 extern bool EnableHotStandby;
 extern bool fullPageWrites;
@@ -106,6 +105,16 @@ extern bool log_checkpoints;
 
 extern int	CheckPointSegments;
 
+/* Archive modes */
+typedef enum ArchiveMode
+{
+	ARCHIVE_MODE_OFF = 0,	/* disabled */
+	ARCHIVE_MODE_ON,		/* enabled while server is running normally */
+	ARCHIVE_MODE_SHARED,	/* archive is shared with master */
+	ARCHIVE_MODE_ALWAYS		/* enabled always (even during recovery) */
+} ArchiveMode;
+extern int	XLogArchiveMode;
+
 /* WAL levels */
 typedef enum WalLevel
 {
@@ -116,7 +125,8 @@ typedef enum WalLevel
 } WalLevel;
 extern int	wal_level;
 
-#define XLogArchivingActive()	(XLogArchiveMode && wal_level >= WAL_LEVEL_ARCHIVE)
+#define XLogArchivingActive() \
+	(XLogArchiveMode > ARCHIVE_MODE_OFF && wal_level >= WAL_LEVEL_ARCHIVE)
 #define XLogArchiveCommandSet() (XLogArchiveCommand[0] != '\0')
 
 /*
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e3fe06e..b95a701 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -910,6 +910,8 @@ extern void pgstat_report_stat(bool force);
 extern void pgstat_vacuum_stat(void);
 extern void pgstat_drop_database(Oid databaseid);
 
+extern void pgstat_request_update(TimestampTz cur_ts, TimestampTz min_ts);
+extern void pgstat_use_stale_snapshot(void);
 extern void pgstat_clear_snapshot(void);
 extern void pgstat_reset_counters(void);
 extern void pgstat_reset_shared_counters(const char *);
-- 
2.1.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to