On 17.01.2013 18:42, Andres Freund wrote:
On 2013-01-17 18:33:42 +0200, Heikki Linnakangas wrote:
On 17.01.2013 17:42, Andres Freund wrote:
Ok, the attached patch seems to fix a) and b). c) above is bogus, as
explained in a comment in the patch.  I also noticed that the TLI check
didn't mark the last source as failed.

This looks fragile:

                        /*
                         * We only end up here without a message when 
XLogPageRead() failed
                         * - in that case we already logged something.
                         * In StandbyMode that only happens if we have been 
triggered, so
                         * we shouldn't loop anymore in that case.
                         */
                        if (errormsg == NULL)
                                break;

I don't like relying on the presence of an error message to control logic
like that. Should we throw in an explicit CheckForStandbyTrigger() check in
the condition of that loop?

I agree, I wasn't too happy about that either. But in some sense its
only propagating state from XLogReadPage which already has dealt with
the error and decided its ok.
Its the solution closest to what happened in the old implementation,
thats why I thought it would be halfway to acceptable.

Adding the CheckForStandbyTrigger() in the condition would mean
promotion would happen before all the available records are processed
and it would increase the amount of stat()s tremendously.
So I don't really like that either.

I was thinking of the attached. As long as we check for CheckForStandbyTrigger() after the "record == NULL" check, we won't perform extra stat() calls on successful reads, only when we're polling after reaching the end of valid WAL. That seems acceptable. If we want to avoid even that, we could move the static 'triggered' variable from CheckForStandbyTrigger() out of that function, and check that in the loop.

- Heikki
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 70cfabc..ac2b26b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -3180,7 +3180,8 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
  * try to read a record just after the last one previously read.
  *
  * If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC, LOG)
+ * (emode must be either PANIC, LOG). In standby mode, retries until a valid
+ * record is available.
  *
  * The record is copied into readRecordBuf, so that on successful return,
  * the returned record pointer always points there.
@@ -3209,12 +3210,6 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
 		{
-			/* not all failures fill errormsg; report those that do */
-			if (errormsg && errormsg[0] != '\0')
-				ereport(emode_for_corrupt_record(emode,
-												 RecPtr ? RecPtr : EndRecPtr),
-						(errmsg_internal("%s", errormsg) /* already translated */));
-
 			lastSourceFailed = true;
 
 			if (readFile >= 0)
@@ -3222,7 +3217,20 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 				close(readFile);
 				readFile = -1;
 			}
-			break;
+
+			/*
+			 * We only end up here without a message when XLogPageRead() failed
+			 * - in that case we already logged something.
+			 * In StandbyMode that only happens if we have been triggered, so
+			 * we shouldn't loop anymore in that case.
+			 */
+			if (errormsg)
+				ereport(emode_for_corrupt_record(emode,
+												 RecPtr ? RecPtr : EndRecPtr),
+						(errmsg_internal("%s", errormsg) /* already translated */));
+
+			/* Give up, or retry if we're in standby mode. */
+			continue;
 		}
 
 		/*
@@ -3234,6 +3242,8 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 			XLogSegNo segno;
 			int32 offset;
 
+			lastSourceFailed = true;
+
 			XLByteToSeg(xlogreader->latestPagePtr, segno);
 			offset = xlogreader->latestPagePtr % XLogSegSize;
 			XLogFileName(fname, xlogreader->readPageTLI, segno);
@@ -3243,9 +3253,10 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 							xlogreader->latestPageTLI,
 							fname,
 							offset)));
-			return false;
+			record = NULL;
+			continue;
 		}
-	} while (StandbyMode && record == NULL);
+	} while (StandbyMode && record == NULL && !CheckForStandbyTrigger());
 
 	return record;
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index ff871a3..75164f6 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -222,11 +222,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogRecord);
 
 	if (readOff < 0)
-	{
-		if (state->errormsg_buf[0] != '\0')
-			*errormsg = state->errormsg_buf;
-		return NULL;
-	}
+		goto err;
 
 	/*
 	 * ReadPageInternal always returns at least the page header, so we can
@@ -246,8 +242,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	{
 		report_invalid_record(state, "invalid record offset at %X/%X",
 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-		*errormsg = state->errormsg_buf;
-		return NULL;
+		goto err;
 	}
 
 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
@@ -255,8 +250,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	{
 		report_invalid_record(state, "contrecord is requested by %X/%X",
 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-		*errormsg = state->errormsg_buf;
-		return NULL;
+		goto err;
 	}
 
 	/* ReadPageInternal has verified the page header */
@@ -270,11 +264,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 							   targetPagePtr,
 						  Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
 	if (readOff < 0)
-	{
-		if (state->errormsg_buf[0] != '\0')
-			*errormsg = state->errormsg_buf;
-		return NULL;
-	}
+		goto err;
 
 	/*
 	 * Read the record length.
@@ -300,11 +290,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	{
 		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
 								   randAccess))
-		{
-			if (state->errormsg_buf[0] != '\0')
-				*errormsg = state->errormsg_buf;
-			return NULL;
-		}
+			goto err;
 		gotheader = true;
 	}
 	else
@@ -314,8 +300,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		{
 			report_invalid_record(state, "invalid record length at %X/%X",
 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-			*errormsg = state->errormsg_buf;
-			return NULL;
+			goto err;
 		}
 		gotheader = false;
 	}
@@ -330,8 +315,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		report_invalid_record(state, "record length %u at %X/%X too long",
 							  total_len,
 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-		*errormsg = state->errormsg_buf;
-		return NULL;
+		goto err;
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to