Hello,

This problem still occurs on the master.
I rebased this to the current master.

At Mon, 3 Apr 2017 08:38:47 +0900, Michael Paquier <michael.paqu...@gmail.com> 
wrote in <cab7npqt8dqk_ce29yq0ckaq7htldyuhndfv6dele4pkyr3s...@mail.gmail.com>
> On Mon, Apr 3, 2017 at 7:19 AM, Venkata B Nagothi <nag1...@gmail.com> wrote:
> > As we are already past the commitfest, I am not sure, what should i change
> > the patch status to ?
> 
> The commit fest finishes on the 7th of April. Even with the deadline
> passed, there is nothing preventing to work on bug fixes. So this item
> ought to be moved to the next CF with the same category.

The steps to reproduce the problem follows.

- Apply the second patch (0002-) attached and recompile. It
  effectively reproduces the problematic state of database.

- M(aster): initdb the master with wal_keep_segments = 0
            (default), log_min_messages = debug2
- M: Create a physical repslot.
- S(tandby): Setup a standby database.
- S: Edit recovery.conf to use the replication slot above then
     start it.
- S: touch /tmp/hoge
- M: Run pgbench ...
- S: After a while, the standby stops.
  > LOG:  #################### STOP THE SERVER

- M: Stop pgbench.
- M: Do 'checkpoint;' twice.
- S: rm /tmp/hoge
- S: Fails to catch up with the following error.

  > FATAL:  could not receive data from WAL stream: ERROR:  requested WAL 
segment 00000001000000000000002B has already been removed


The first patch (0001-) fixes this problem, preventing the
problematic state of WAL segments by retarding restart LSN of a
physical replication slot in a certain condition.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 3813599b74299f1da8d0567ed90542c5f35ed48b Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 1 Feb 2017 16:07:22 +0900
Subject: [PATCH 1/2] Retard restart LSN of a slot when a segment starts with a
 contrecord.

A physical-replication standby can stop just at the boundary of WAL
segments. restart_lsn of a slot on the master can be assumed to be the
same location. The last segment on the master will be removed after
some checkpoints for the case. If the last record of the last
replicated segment continues to the next segment, the continuation
record is only on the master. The standby cannot start in the case
because the split record is not available from only one source.

This patch detains restart_lsn in the last sgement when the first page
of the next segment is a continuation record.
---
 src/backend/replication/walsender.c | 105 +++++++++++++++++++++++++++++++++---
 1 file changed, 98 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 03e1cf4..30c80af 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -217,6 +217,13 @@ static struct
 	WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
 }			LagTracker;
 
+/*
+ * This variable corresponds to restart_lsn in pg_replication_slots for a
+ * physical slot. This has a valid value only when it differs from the current
+ * flush pointer.
+ */
+static XLogRecPtr	   restartLSN = InvalidXLogRecPtr;
+
 /* Signal handlers */
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
@@ -251,7 +258,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -546,6 +553,9 @@ StartReplication(StartReplicationCmd *cmd)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 (errmsg("cannot use a logical replication slot for physical replication"))));
+
+		/* Restore restartLSN from replication slot */
+		restartLSN = MyReplicationSlot->data.restart_lsn;
 	}
 
 	/*
@@ -561,6 +571,10 @@ StartReplication(StartReplicationCmd *cmd)
 	else
 		FlushPtr = GetFlushRecPtr();
 
+	/* Set InvalidXLogRecPtr if catching up */
+	if (restartLSN == FlushPtr)
+		restartLSN = InvalidXLogRecPtr;
+
 	if (cmd->timeline != 0)
 	{
 		XLogRecPtr	switchpoint;
@@ -770,7 +784,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
 	/* now actually read the data, we know it's there */
-	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false);
 
 	return count;
 }
@@ -1738,7 +1752,7 @@ static void
 ProcessStandbyReplyMessage(void)
 {
 	XLogRecPtr	writePtr,
-				flushPtr,
+				flushPtr, oldFlushPtr,
 				applyPtr;
 	bool		replyRequested;
 	TimeOffset	writeLag,
@@ -1798,6 +1812,7 @@ ProcessStandbyReplyMessage(void)
 		WalSnd	   *walsnd = MyWalSnd;
 
 		SpinLockAcquire(&walsnd->mutex);
+		oldFlushPtr = walsnd->flush;
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
@@ -1821,7 +1836,75 @@ ProcessStandbyReplyMessage(void)
 		if (SlotIsLogical(MyReplicationSlot))
 			LogicalConfirmReceivedLocation(flushPtr);
 		else
-			PhysicalConfirmReceivedLocation(flushPtr);
+		{
+			/*
+			 * Recovery on standby requires that a continuation record is
+			 * available from single WAL source. For the reason, physical
+			 * replication slot should stay in the first segment of the
+			 * multiple segments that a continued record is spanning
+			 * over. Since we look pages and don't look into individual record
+			 * here, restartLSN may stay a bit too behind but it doesn't
+			 * matter.
+			 *
+			 * Since the objective is avoiding to remove required segments,
+			 * checking at the beginning of every segment is enough. But once
+			 * restartLSN goes behind, check every page for quick restoration.
+			 *
+			 * restartLSN has a valid value only when it is behind flushPtr.
+			 */
+			if (oldFlushPtr != InvalidXLogRecPtr &&
+				(restartLSN == InvalidXLogRecPtr ?
+				 oldFlushPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+				 restartLSN / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ))
+			{
+				XLogRecPtr rp;
+
+				if (restartLSN == InvalidXLogRecPtr)
+					restartLSN = oldFlushPtr;
+
+				rp = restartLSN - (restartLSN % XLOG_BLCKSZ);
+
+				/*
+				 * We may have let the record at flushPtr be sent, so it's
+				 * worth looking
+				 */
+				while (rp <= flushPtr)
+				{
+					XLogPageHeaderData header;
+
+					/*
+					 * If the page header is not available for now, don't move
+					 * restartLSN forward. We can read it by the next chance.
+					 */
+					if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+					{
+						bool found;
+						/*
+						 * Fetch the page header of the next page. Move
+						 * restartLSN forward only if it is not a continuation
+						 * page.
+						 */
+						found = XLogRead((char *)&header, rp,
+											 sizeof(XLogPageHeaderData), true);
+						if (found &&
+							(header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+							restartLSN = rp;
+					}
+					rp += XLOG_BLCKSZ;
+				}
+
+				/*
+				 * If restartLSN is on the same page with flushPtr, it means
+				 * that we are catching up.
+				 */
+				if (restartLSN / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+					restartLSN = InvalidXLogRecPtr;
+			}
+
+			/* restartLSN == InvalidXLogRecPtr means catching up */
+			PhysicalConfirmReceivedLocation(restartLSN != InvalidXLogRecPtr ?
+											restartLSN : flushPtr);
+		}
 	}
 }
 
@@ -2288,6 +2371,7 @@ WalSndKill(int code, Datum arg)
 
 /*
  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * Returns false if the segment file is not found when notfoundok is true.
  *
  * XXX probably this should be improved to suck data directly from the
  * WAL buffers when possible.
@@ -2297,8 +2381,8 @@ WalSndKill(int code, Datum arg)
  * always be one descriptor left open until the process ends, but never
  * more than one.
  */
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2375,10 +2459,15 @@ retry:
 				 * removed or recycled.
 				 */
 				if (errno == ENOENT)
+				{
+					if (notfoundok)
+						return false;
+
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("requested WAL segment %s has already been removed",
 									XLogFileNameP(curFileTimeLine, sendSegNo))));
+				}
 				else
 					ereport(ERROR,
 							(errcode_for_file_access(),
@@ -2460,6 +2549,8 @@ retry:
 			goto retry;
 		}
 	}
+
+	return true;
 }
 
 /*
@@ -2694,7 +2785,7 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+	XLogRead(&output_message.data[output_message.len], startptr, nbytes, false);
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
-- 
2.9.2

>From c6931b512ca37991a068a632d00fe0e4a6df46bb Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 28 Aug 2017 18:46:49 +0900
Subject: [PATCH 2/2] Debug assistant code.

This patch reliably reproduces the problematic situation.
---
 src/backend/replication/walreceiver.c | 23 +++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ea9d21a..9dbf9e1 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -986,6 +986,29 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 			recvFile = XLogFileInit(recvSegNo, &use_existent, true);
 			recvFileTLI = ThisTimeLineID;
 			recvOff = 0;
+
+			if ((recptr & 0xffffffL) == 0)
+			{
+				XLogPageHeader ph = (XLogPageHeader) buf;
+				Assert(nbytes >= sizeof(SizeOfXLogShortPHD));
+
+				elog(LOG, "############# CHECK AT %lX : %d",
+					 recptr, (ph->xlp_info & XLP_FIRST_IS_CONTRECORD) != 0);
+				if (ph->xlp_info & XLP_FIRST_IS_CONTRECORD)
+				{
+					struct stat sbuf;
+					if (stat("/tmp/hoge", &sbuf) == 0)
+					{
+						elog(LOG, "#################### STOP THE SERVER");
+						system("pg_ctl stop -m f -W");
+						while (1)
+						{
+							ProcessWalRcvInterrupts();
+							sleep(1);
+						}
+					}
+				}
+			}
 		}
 
 		/* Calculate the start offset of the received logs */
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to