On Wed, Nov 9, 2016 at 11:32 PM, Kuntal Ghosh
<kuntalghosh.2...@gmail.com> wrote:
> On Fri, Nov 4, 2016 at 1:32 PM, Michael Paquier
> <michael.paqu...@gmail.com> wrote:
>> Thank you for the new patch. This will be hopefully the last round of
>> reviews, we are getting close to something that has an acceptable
>> shape.
>
> Thanks a lot for reviewing the patch. Based on your review, I've attached the
> updated patch along with few comments.

Thanks for the new version. pg_xlogdump is really helpful now for debugging.

>> I haven't performed any tests with the patch, and that's all I have
>> regarding the code. With that done we should be in good shape
>> code-speaking I think...
>
> I've done a fair amount of testing which includes regression tests
> and manual creation of inconsistencies in the page. I've also found the
> reason behind inconsistency in brin revmap page.
> Brin revmap page doesn't have standard page layout. Besides, It doesn't update
> pd_upper and pd_lower in its operations as well. But, during WAL
> insertions, it uses
> REGBUF_STANDARD to register a reference in the WAL record. Hence, when we
> restore image before consistency check, RestoreBlockImage fills the space
> between pd_upper and pd_lower(page hole) with zero. I've posted this as a
> separate thread.
> https://www.postgresql.org/message-id/flat/CAGz5QCJ%3D00UQjScSEFbV%3D0qO5ShTZB9WWz_Fm7%2BWd83zPs9Geg%40mail.gmail.com#CAGz5QCJ=00UQjScSEFbV=0qo5shtzb9wwz_fm7+wd83zps9...@mail.gmail.com

Nice to have spotted the inconsistency. This tool is going to be useful..

I have spent a couple of hours playing with the patch, and worked on
it a bit more with a couple of minor changes:
- In gindesc.c, the if blocks are more readable with brackets.
- Addition of a comment when info is set, to mention that this is done
at the beginning of the routine so as callers of XLogInsert() can pass
the flag for consistency checks.
- apply_image should be reset in ResetDecoder().
- The BRIN problem is here:
2016-11-10 12:24:10 JST [65776]: [23-1] db=,user=,app=,client= FATAL:
Inconsistent page found, rel 1663/16385/30625, forknum 0, blkno 1
2016-11-10 12:24:10 JST [65776]: [24-1] db=,user=,app=,client=
CONTEXT:  xlog redo at 0/9BD31148 for BRIN/UPDATE+INIT: heapBlk 100
pagesPerRange 1 old offnum 11, new offnum 1
2016-11-10 12:24:10 JST [65776]: [25-1] db=,user=,app=,client=
WARNING:  buffer refcount leak: [4540] (rel=base/16385/30625,
blockNum=1, flags=0x93800000, refcount=1 1)
TRAP: FailedAssertion("!(RefCountErrors == 0)", File: "bufmgr.c", Line: 2506)
Now the buffer refcount leak is not normal! The safest thing to do
here is to release the buffer once a copy of it has been taken, and
the leaks goes away when calling FATAL to report the inconsistency.
- When checking for XLR_CHECK_CONSISTENCY, better to add a != 0 to get
a boolean out of it.
- guc_malloc() should be done as late as possible, this simplifies the
code and prevents any memory leaks which is what your patch is doing
when there is an error. (I have finally put my finger on what was
itching me here).
- In btree_mask, the lookup of BTP_DELETED can be deadly simplified.
- I wondered also about putting assign_wal_consistency and
check_wal_consistency in a separate file, say xlogparams.c, concluding
that the current code does nothing bad either even if it adds rmgr.h
in the list of headers in guc.c.
- Some comment blocks are longer than 72~80 characters.
- Typos!

With the patch for BRIN applied, I am able to get installcheck-world
working with wal_consistency = all and a standby doing the consistency
checks behind. Adding wal_consistency = all in PostgresNode.pm, the
recovery tests are passing. This patch is switched as "Ready for
Committer". Thanks for completing this effort begun 3 years ago!
-- 
Michael
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index adab2f8..57660d3 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2476,6 +2476,35 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-wal-consistency" xreflabel="wal_consistency">
+      <term><varname>wal_consistency</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>wal_consistency</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        This parameter is used to check the consistency of WAL records, i.e,
+        whether the WAL records are inserted and applied correctly. When
+        <varname>wal_consistency</varname> is enabled for a WAL record, it
+        stores a full-page image along with the record. When a full-page image
+        arrives during redo, it compares against the current page to check 
whether
+        both are consistent.
+       </para>
+
+       <para>
+        By default, this setting does not contain any value. To check
+        all records written to the write-ahead log, set this parameter to
+        <literal>all</literal>. To check only some records, specify a
+        comma-separated list of resource managers. The resource managers
+        which are currently supported are <literal>heap2</>, <literal>heap</>,
+        <literal>btree</>, <literal>gin</>, <literal>gist</>,
+        <literal>spgist</>, <literal>sequence</> and <literal>brin</>. Only
+        superusers can change this setting.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-wal-buffers" xreflabel="wal_buffers">
       <term><varname>wal_buffers</varname> (<type>integer</type>)
       <indexterm>
diff --git a/src/backend/access/brin/brin_xlog.c 
b/src/backend/access/brin/brin_xlog.c
index 5a6b728..2af524d 100644
--- a/src/backend/access/brin/brin_xlog.c
+++ b/src/backend/access/brin/brin_xlog.c
@@ -14,6 +14,7 @@
 #include "access/brin_pageops.h"
 #include "access/brin_xlog.h"
 #include "access/xlogutils.h"
+#include "storage/bufmask.h"
 
 
 /*
@@ -279,3 +280,38 @@ brin_redo(XLogReaderState *record)
                        elog(PANIC, "brin_redo: unknown op code %u", info);
        }
 }
+
+/*
+ * Mask a BRIN page before doing consistency checks.
+ */
+void
+brin_mask(char *page, BlockNumber blkno)
+{
+       Page    page_norm = (Page) page;
+       OffsetNumber offnum,
+                               maxoff;
+
+       mask_page_lsn(page_norm);
+
+       mask_page_hint_bits(page_norm);
+
+       if (BRIN_IS_REGULAR_PAGE(page_norm))
+       {
+               mask_unused_space(page_norm);
+
+               maxoff = PageGetMaxOffsetNumber(page_norm);
+               for (offnum = FirstOffsetNumber;
+                        offnum <= maxoff;
+                        offnum = OffsetNumberNext(offnum))
+               {
+                       ItemId          itemId = PageGetItemId(page_norm, 
offnum);
+
+                       if (ItemIdIsUsed(itemId))
+                               itemId->lp_flags = LP_UNUSED;
+               }
+       }
+
+       /*
+        * If necessary, handle the case of meta and revmap pages here.
+        */
+}
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index a40f168..f8604db 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -15,6 +15,7 @@
 
 #include "access/gin_private.h"
 #include "access/xlogutils.h"
+#include "storage/bufmask.h"
 #include "utils/memutils.h"
 
 static MemoryContext opCtx;            /* working memory for operations */
@@ -758,3 +759,31 @@ gin_xlog_cleanup(void)
        MemoryContextDelete(opCtx);
        opCtx = NULL;
 }
+
+/*
+ * Mask a GIN page before running consistency checks on it.
+ */
+void
+gin_mask(char *page, BlockNumber blkno)
+{
+       Page    page_norm = (Page) page;
+       GinPageOpaque opaque;
+
+       mask_page_lsn(page_norm);
+       opaque = GinPageGetOpaque(page_norm);
+
+       /* GIN metapage doesn't use pd_lower/pd_upper. Other page types do. */
+       if (opaque->flags != GIN_META)
+       {
+               mask_page_hint_bits(page_norm);
+
+               /*
+                * For GIN_DELETED page, the page is initialized to empty.
+                * Hence mask everything.
+                */
+               if (opaque->flags & GIN_DELETED)
+                       memset(page_norm, MASK_MARKER, BLCKSZ);
+               else
+                       mask_unused_space(page_norm);
+       }
+}
diff --git a/src/backend/access/gist/gistxlog.c 
b/src/backend/access/gist/gistxlog.c
index 5853d76..f7abb9c 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -16,6 +16,7 @@
 #include "access/gist_private.h"
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
+#include "storage/bufmask.h"
 #include "utils/memutils.h"
 
 static MemoryContext opCtx;            /* working memory for operations */
@@ -343,6 +344,52 @@ gist_xlog_cleanup(void)
 }
 
 /*
+ * Mask a Gist page before running consistency checks on it.
+ */
+void
+gist_mask(char *page, BlockNumber blkno)
+{
+       Page    page_norm = (Page) page;
+       OffsetNumber offnum,
+                               maxoff;
+
+       mask_page_lsn(page_norm);
+
+       mask_page_hint_bits(page_norm);
+       mask_unused_space(page_norm);
+
+       /* Mask NSN */
+       GistPageSetNSN(page_norm, PG_UINT64_MAX);
+
+       /*
+        * We update F_FOLLOW_RIGHT flag on the left child after writing WAL
+        * record.  Hence, mask this flag.
+        */
+       GistMarkFollowRight(page_norm);
+
+       if (GistPageIsLeaf(page_norm))
+       {
+               /*
+                * For gist leaf pages, mask some line pointer bits, 
particularly
+                * those marked as used on a master and unused on a standby.
+                */
+               maxoff = PageGetMaxOffsetNumber(page_norm);
+               for (offnum = FirstOffsetNumber;
+                        offnum <= maxoff;
+                        offnum = OffsetNumberNext(offnum))
+               {
+                       ItemId          itemId = PageGetItemId(page_norm, 
offnum);
+
+                       if (ItemIdIsUsed(itemId))
+                               itemId->lp_flags = LP_UNUSED;
+               }
+       }
+
+       /* In Gist redo, we never mark a page as garbage. Hence, Mask It.*/
+       GistClearPageHasGarbage(page_norm);
+}
+
+/*
  * Write WAL record of a page split.
  */
 XLogRecPtr
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index b019bc1..c5fe761 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -57,6 +57,7 @@
 #include "catalog/namespace.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "storage/bufmask.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
@@ -9131,3 +9132,61 @@ heap_sync(Relation rel)
                heap_close(toastrel, AccessShareLock);
        }
 }
+
+/*
+ * Mask a heap page before performing consistency checks on it.
+ */
+void
+heap_mask(char *page, BlockNumber blkno)
+{
+       Page    page_norm = (Page) page;
+       OffsetNumber off;
+
+       mask_page_lsn(page_norm);
+
+       mask_page_hint_bits(page_norm);
+       mask_unused_space(page_norm);
+
+       for (off = 1; off <= PageGetMaxOffsetNumber(page_norm); off++)
+       {
+               ItemId  iid = PageGetItemId(page, off);
+               char   *page_item;
+
+               page_item = (char *) (page_norm + ItemIdGetOffset(iid));
+
+               /*
+                * Ignore hint bits and command ID.
+                */
+               if (ItemIdIsNormal(iid))
+               {
+                       HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
+
+                       page_htup->t_infomask =
+                               HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID |
+                               HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID;
+                       page_htup->t_infomask |= HEAP_XACT_MASK;
+                       page_htup->t_choice.t_heap.t_field3.t_cid = 
PG_UINT32_MAX;
+
+                       /*
+                        * For a speculative tuple, the content of t_ctid is 
conflicting
+                        * between the backup page and current page. Hence, we 
set it
+                        * to the current block number and current offset.
+                        */
+                       if (HeapTupleHeaderIsSpeculative(page_htup))
+                               ItemPointerSet(&page_htup->t_ctid, blkno, off);
+               }
+
+               /*
+                * Ignore any padding bytes after the tuple, when the length of
+                * the item is not MAXALIGNed.
+                */
+               if (ItemIdHasStorage(iid))
+               {
+                       int len = ItemIdGetLength(iid);
+                       int padlen = MAXALIGN(len) - len;
+
+                       if (padlen > 0)
+                               memset(page_item + len, MASK_MARKER, padlen);
+               }
+       }
+}
diff --git a/src/backend/access/nbtree/nbtxlog.c 
b/src/backend/access/nbtree/nbtxlog.c
index c536e22..bd1e353 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -19,6 +19,7 @@
 #include "access/transam.h"
 #include "access/xlog.h"
 #include "access/xlogutils.h"
+#include "storage/bufmask.h"
 #include "storage/procarray.h"
 #include "miscadmin.h"
 
@@ -1028,3 +1029,56 @@ btree_redo(XLogReaderState *record)
                        elog(PANIC, "btree_redo: unknown op code %u", info);
        }
 }
+
+/*
+ * Mask a btree page before performing consistency checks on it.
+ */
+void
+btree_mask(char *page, BlockNumber blkno)
+{
+       Page    page_norm = (Page) page;
+       OffsetNumber off;
+       OffsetNumber maxoff;
+       BTPageOpaque maskopaq;
+
+       mask_page_lsn(page_norm);
+
+       mask_page_hint_bits(page_norm);
+       mask_unused_space(page_norm);
+
+       maskopaq = (BTPageOpaque) PageGetSpecialPointer(page_norm);
+
+       /*
+        * Mask everything on a DELETED page.
+        */
+       if ((maskopaq->btpo_flags & BTP_DELETED) != 0)
+       {
+               /* Page content, between standard page header and opaque struct 
*/
+               memset(page_norm + SizeOfPageHeaderData, MASK_MARKER,
+                          BLCKSZ - SizeOfPageHeaderData);
+
+               /* pd_lower and upper */
+               memset(&((PageHeader) page_norm)->pd_lower, MASK_MARKER,
+                          sizeof(uint16));
+               memset(&((PageHeader) page_norm)->pd_upper, MASK_MARKER,
+                          sizeof(uint16));
+       }
+       else
+       {
+               /*
+                * Mask some line pointer bits, particularly those marked as
+                * used on a master and unused on a standby.
+                */
+               maxoff = PageGetMaxOffsetNumber(page_norm);
+               for (off = 1; off <= maxoff; off++)
+               {
+                       ItemId iid = PageGetItemId(page_norm, off);
+
+                       if (ItemIdIsUsed(iid))
+                               iid->lp_flags = LP_UNUSED;
+               }
+       }
+
+       maskopaq->btpo_flags |= BTP_SPLIT_END | BTP_HAS_GARBAGE;
+       maskopaq->btpo_cycleid = 0;
+}
diff --git a/src/backend/access/rmgrdesc/gindesc.c 
b/src/backend/access/rmgrdesc/gindesc.c
index db832a5..75d0e09 100644
--- a/src/backend/access/rmgrdesc/gindesc.c
+++ b/src/backend/access/rmgrdesc/gindesc.c
@@ -113,7 +113,12 @@ gin_desc(StringInfo buf, XLogReaderState *record)
                                        (ginxlogRecompressDataLeaf *) payload;
 
                                        if (XLogRecHasBlockImage(record, 0))
-                                               appendStringInfoString(buf, " 
(full page image)");
+                                       {
+                                               if 
(XLogRecBlockImageApply(record, 0))
+                                                       
appendStringInfoString(buf, " (full page image, apply)");
+                                               else
+                                                       
appendStringInfoString(buf, " (full page image)");
+                                       }
                                        else
                                                desc_recompress_leaf(buf, 
insertData);
                                }
@@ -147,7 +152,12 @@ gin_desc(StringInfo buf, XLogReaderState *record)
                                ginxlogVacuumDataLeafPage *xlrec = 
(ginxlogVacuumDataLeafPage *) rec;
 
                                if (XLogRecHasBlockImage(record, 0))
-                                       appendStringInfoString(buf, " (full 
page image)");
+                               {
+                                       if (XLogRecBlockImageApply(record, 0))
+                                               appendStringInfoString(buf, " 
(full page image, apply)");
+                                       else
+                                               appendStringInfoString(buf, " 
(full page image)");
+                               }
                                else
                                        desc_recompress_leaf(buf, &xlrec->data);
                        }
diff --git a/src/backend/access/spgist/spgxlog.c 
b/src/backend/access/spgist/spgxlog.c
index e016cdb..f66f73a 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -18,6 +18,7 @@
 #include "access/transam.h"
 #include "access/xlog.h"
 #include "access/xlogutils.h"
+#include "storage/bufmask.h"
 #include "storage/standby.h"
 #include "utils/memutils.h"
 
@@ -1023,3 +1024,19 @@ spg_xlog_cleanup(void)
        MemoryContextDelete(opCtx);
        opCtx = NULL;
 }
+
+/*
+ * Mask a SpGist page
+ */
+void
+spg_mask(char *page, BlockNumber blkno)
+{
+       Page    page_norm = (Page) page;
+
+       mask_page_lsn(page_norm);
+
+       mask_page_hint_bits(page_norm);
+
+       if (!SpGistPageIsMeta(page_norm))
+               mask_unused_space(page_norm);
+}
diff --git a/src/backend/access/transam/rmgr.c 
b/src/backend/access/transam/rmgr.c
index 9bb1362..eae7524 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -30,8 +30,8 @@
 #include "utils/relmapper.h"
 
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
-       { name, redo, desc, identify, startup, cleanup },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+       { name, redo, desc, identify, startup, cleanup, mask },
 
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 6cec027..a8355659 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -95,6 +95,8 @@ bool          EnableHotStandby = false;
 bool           fullPageWrites = true;
 bool           wal_log_hints = false;
 bool           wal_compression = false;
+char      *wal_consistency_string = NULL;
+bool      *wal_consistency = NULL;
 bool           log_checkpoints = false;
 int                    sync_method = DEFAULT_SYNC_METHOD;
 int                    wal_level = WAL_LEVEL_MINIMAL;
@@ -245,6 +247,10 @@ bool               InArchiveRecovery = false;
 /* Was the last xlog file restored from archive, or local? */
 static bool restoredFromArchive = false;
 
+/* Buffers dedicated to consistency checks of size BLCKSZ */
+static char *new_page_masked = NULL;
+static char *old_page_masked = NULL;
+
 /* options taken from recovery.conf for archive recovery */
 char      *recoveryRestoreCommand = NULL;
 static char *recoveryEndCommand = NULL;
@@ -867,6 +873,7 @@ static char *GetXLogBuffer(XLogRecPtr ptr);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
 static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
+static void checkConsistency(XLogReaderState *record);
 
 static void WALInsertLockAcquire(void);
 static void WALInsertLockAcquireExclusive(void);
@@ -1262,6 +1269,99 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr 
*EndPos, XLogRecPtr *PrevPtr)
 }
 
 /*
+ * Checks whether the current buffer page and backup page stored in the
+ * WAL record are consistent or not. Before comparing the two pages, a
+ * masking is applied to the pages to ignore certain areas like hint bits,
+ * unused space between pd_lower and pd_upper among other things. This
+ * function should be called once WAL replay has been completed for a
+ * given record.
+ */
+static void
+checkConsistency(XLogReaderState *record)
+{
+       RmgrId          rmid = XLogRecGetRmid(record);
+       RelFileNode     rnode;
+       ForkNumber      forknum;
+       BlockNumber     blkno;
+       int             block_id;
+
+       /* records with no backup blocks have no need for consistency checks */
+       if (!XLogRecHasAnyBlockRefs(record))
+               return;
+
+       /*
+        * Leave if no masking functions defined, this is possible in the case
+        * resource managers generating just full page writes, comparing an
+        * image to itself has no meaning in those cases.
+        */
+       if (RmgrTable[rmid].rm_mask == NULL)
+               return;
+
+       Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0);
+
+       for (block_id = 0; block_id <= record->max_block_id; block_id++)
+       {
+               Buffer  buf;
+               Page    new_page;
+
+               if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, 
&blkno))
+               {
+                       /* Caller specified a bogus block_id. Do nothing. */
+                       continue;
+               }
+
+               Assert(XLogRecHasBlockImage(record, block_id));
+
+               /*
+                * If we've just restored the block from backup image, skip
+                * consistency check.
+                */
+               if (XLogRecBlockImageApply(record, block_id))
+                       continue;
+
+               /*
+                * Read the contents from the current buffer and store it in a
+                * temporary page.
+                */
+               buf = XLogReadBufferExtended(rnode, forknum, blkno,
+                                                                               
  RBM_NORMAL);
+               if (!BufferIsValid(buf))
+                       continue;
+
+               new_page = BufferGetPage(buf);
+
+               /*
+                * Read the contents from the backup copy, stored in WAL record
+                * and store it in a temporary page. There is not need to 
allocate
+                * a new page here, a local buffer is fine to hold its contents 
and
+                * a mask can be directly applied on it.
+                */
+               if (!RestoreBlockImage(record, block_id, old_page_masked))
+                       elog(ERROR, "failed to restore block image");
+
+               /*
+                * Take a copy of the new page where WAL has been applied to 
have
+                * a comparison base before masking it...
+                */
+               memcpy(new_page_masked, new_page, BLCKSZ);
+
+               /* No need for this page anymore now that a copy is in */
+               ReleaseBuffer(buf);
+
+               /* ... And mask both the new and old pages */
+               RmgrTable[rmid].rm_mask(new_page_masked, blkno);
+               RmgrTable[rmid].rm_mask(old_page_masked, blkno);
+
+               /* Time to compare the old and new contents */
+               if (memcmp(new_page_masked, old_page_masked, BLCKSZ) != 0)
+                       elog(FATAL,
+                                "Inconsistent page found, rel %u/%u/%u, 
forknum %u, blkno %u",
+                                rnode.spcNode, rnode.dbNode, rnode.relNode,
+                                forknum, blkno);
+       }
+}
+
+/*
  * Subroutine of XLogInsertRecord.  Copies a WAL record to an already-reserved
  * area in the WAL.
  */
@@ -6149,6 +6249,13 @@ StartupXLOG(void)
                   errdetail("Failed while allocating an XLog reading 
processor.")));
        xlogreader->system_identifier = ControlFile->system_identifier;
 
+       /*
+        * Allocate pages dedicated to WAL consistency checks, those had better
+        * be aligned.
+        */
+       new_page_masked = (char *) palloc(BLCKSZ);
+       old_page_masked = (char *) palloc(BLCKSZ);
+
        if (read_backup_label(&checkPointLoc, &backupEndRequired,
                                                  &backupFromStandby))
        {
@@ -6949,6 +7056,15 @@ StartupXLOG(void)
                                /* Now apply the WAL record itself */
                                RmgrTable[record->xl_rmid].rm_redo(xlogreader);
 
+                               /*
+                                * After redo, check whether the backup pages 
associated with
+                                * the WAL record are consistent with the 
existing pages. This
+                                * check is done only if consistency check is 
enabled for this
+                                * record.
+                                */
+                               if ((record->xl_info & XLR_CHECK_CONSISTENCY) 
!= 0)
+                                       checkConsistency(xlogreader);
+
                                /* Pop the error context stack */
                                error_context_stack = errcallback.previous;
 
@@ -7479,6 +7595,12 @@ StartupXLOG(void)
        }
        XLogReaderFree(xlogreader);
 
+       /* Clean up buffers dedicated to WAL consistency checks */
+       if (old_page_masked)
+               pfree(old_page_masked);
+       if (new_page_masked)
+               pfree(new_page_masked);
+
        /*
         * If any of the critical GUCs have changed, log them before we allow
         * backends to write WAL.
diff --git a/src/backend/access/transam/xloginsert.c 
b/src/backend/access/transam/xloginsert.c
index 3cd273b..c635844 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -414,10 +414,12 @@ XLogInsert(RmgrId rmid, uint8 info)
                elog(ERROR, "XLogBeginInsert was not called");
 
        /*
-        * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
-        * reserved for use by me.
+        * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
+        * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
         */
-       if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
+       if ((info & ~(XLR_RMGR_INFO_MASK |
+                                 XLR_SPECIAL_REL_UPDATE |
+                                 XLR_CHECK_CONSISTENCY)) != 0)
                elog(PANIC, "invalid xlog info mask %02X", info);
 
        TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
@@ -498,6 +500,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
        hdr_rdt.data = hdr_scratch;
 
        /*
+        * Enforce consistency checks for this record if user is looking for
+        * it. Do this before at the beginning of this routine to give the
+        * possibility for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY
+        * directly for a record.
+        */
+       if (wal_consistency[rmid])
+               info |= XLR_CHECK_CONSISTENCY;
+
+       /*
         * Make an rdata chain containing all the data portions of all block
         * references. This includes the data for full-page images. Also append
         * the headers for the block references in the scratch buffer.
@@ -513,6 +524,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                XLogRecordBlockCompressHeader cbimg = {0};
                bool            samerel;
                bool            is_compressed = false;
+               bool            include_image;
 
                if (!regbuf->in_use)
                        continue;
@@ -556,7 +568,14 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
                        bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
 
-               if (needs_backup)
+               /*
+                * If needs_backup is true or wal consistency check is enabled 
for
+                * current resource manager, log a full-page write for the 
current
+                * block.
+                */
+               include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) 
!= 0;
+
+               if (include_image)
                {
                        Page            page = regbuf->page;
                        uint16          compressed_len;
@@ -618,6 +637,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
                        bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : 
BKPIMAGE_HAS_HOLE;
 
+                       /*
+                        * If WAL consistency is enabled for the resource 
manager of
+                        * this WAL record, a full-page image is included in 
the record
+                        * for the block modified. During redo, the full-page 
is replayed
+                        * only if BKPIMAGE_APPLY is set.
+                        */
+                       if (needs_backup)
+                               bimg.bimg_info |= BKPIMAGE_APPLY;
+
                        if (is_compressed)
                        {
                                bimg.length = compressed_len;
@@ -680,7 +708,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                /* Ok, copy the header to the scratch buffer */
                memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
                scratch += SizeOfXLogRecordBlockHeader;
-               if (needs_backup)
+               if (include_image)
                {
                        memcpy(scratch, &bimg, 
SizeOfXLogRecordBlockImageHeader);
                        scratch += SizeOfXLogRecordBlockImageHeader;
diff --git a/src/backend/access/transam/xlogreader.c 
b/src/backend/access/transam/xlogreader.c
index 56d4c66..4be6373 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -997,6 +997,7 @@ ResetDecoder(XLogReaderState *state)
                state->blocks[block_id].in_use = false;
                state->blocks[block_id].has_image = false;
                state->blocks[block_id].has_data = false;
+               state->blocks[block_id].apply_image = false;
        }
        state->max_block_id = -1;
 }
@@ -1089,6 +1090,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord 
*record, char **errormsg)
 
                        blk = &state->blocks[block_id];
                        blk->in_use = true;
+                       blk->apply_image = false;
 
                        COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
                        blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
@@ -1120,6 +1122,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord 
*record, char **errormsg)
                                COPY_HEADER_FIELD(&blk->bimg_len, 
sizeof(uint16));
                                COPY_HEADER_FIELD(&blk->hole_offset, 
sizeof(uint16));
                                COPY_HEADER_FIELD(&blk->bimg_info, 
sizeof(uint8));
+
+                               blk->apply_image = ((blk->bimg_info & 
BKPIMAGE_APPLY) != 0);
+
                                if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
                                {
                                        if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
@@ -1243,6 +1248,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord 
*record, char **errormsg)
 
                if (!blk->in_use)
                        continue;
+
+               Assert(blk->has_image || !blk->apply_image);
+
                if (blk->has_image)
                {
                        blk->bkp_image = ptr;
diff --git a/src/backend/access/transam/xlogutils.c 
b/src/backend/access/transam/xlogutils.c
index 51a8e8d..651faf2 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -275,9 +275,9 @@ XLogCheckInvalidPages(void)
  * will complain if we don't have the lock.  In hot standby mode it's
  * definitely necessary.)
  *
- * Note: when a backup block is available in XLOG, we restore it
- * unconditionally, even if the page in the database appears newer.  This is
- * to protect ourselves against database pages that were partially or
+ * Note: when a backup block is available in XLOG with BKPIMAGE_APPLY flag
+ * set, we restore it, even if the page in the database appears newer.  This
+ * is to protect ourselves against database pages that were partially or
  * incorrectly written during a crash.  We assume that the XLOG data must be
  * good because it has passed a CRC check, while the database page might not
  * be.  This will force us to replay all subsequent modifications of the page
@@ -310,9 +310,11 @@ XLogInitBufferForRedo(XLogReaderState *record, uint8 
block_id)
  * XLogReadBufferForRedoExtended
  *             Like XLogReadBufferForRedo, but with extra options.
  *
- * In RBM_ZERO_* modes, if the page doesn't exist, the relation is extended
- * with all-zeroes pages up to the referenced block number.  In
- * RBM_ZERO_AND_LOCK and RBM_ZERO_AND_CLEANUP_LOCK modes, the return value
+ * In RBM_ZERO_* modes, if the page doesn't exist or BKPIMAGE_APPLY flag
+ * is not set for the backup block, the relation is extended with all-zeroes
+ * pages up to the referenced block number.
+ *
+ * In RBM_ZERO_AND_LOCK and RBM_ZERO_AND_CLEANUP_LOCK modes, the return value
  * is always BLK_NEEDS_REDO.
  *
  * (The RBM_ZERO_AND_CLEANUP_LOCK mode is redundant with the get_cleanup_lock
@@ -352,9 +354,10 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
        if (!willinit && zeromode)
                elog(PANIC, "block to be initialized in redo routine must be 
marked with WILL_INIT flag in the WAL record");
 
-       /* If it's a full-page image, restore it. */
-       if (XLogRecHasBlockImage(record, block_id))
+       /* If it has a full-page image and it should be restored, do it. */
+       if (XLogRecBlockImageApply(record, block_id))
        {
+               Assert(XLogRecHasBlockImage(record, block_id));
                *buf = XLogReadBufferExtended(rnode, forknum, blkno,
                   get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : 
RBM_ZERO_AND_LOCK);
                page = BufferGetPage(*buf);
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index fc3a8ee..864d6a9 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -31,6 +31,7 @@
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "storage/bufmask.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/smgr.h"
@@ -1646,3 +1647,14 @@ ResetSequenceCaches(void)
 
        last_used_seq = NULL;
 }
+
+/*
+ * Mask a Sequence page before performing consistency checks on it.
+ */
+void
+seq_mask(char *page, BlockNumber blkno)
+{
+       mask_page_lsn(page);
+
+       mask_unused_space(page);
+}
diff --git a/src/backend/storage/buffer/Makefile 
b/src/backend/storage/buffer/Makefile
index 2c10fba..8630dca 100644
--- a/src/backend/storage/buffer/Makefile
+++ b/src/backend/storage/buffer/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/storage/buffer
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = buf_table.o buf_init.o bufmgr.o freelist.o localbuf.o
+OBJS = buf_table.o buf_init.o bufmask.o bufmgr.o freelist.o localbuf.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/buffer/bufmask.c 
b/src/backend/storage/buffer/bufmask.c
new file mode 100644
index 0000000..0e062ac
--- /dev/null
+++ b/src/backend/storage/buffer/bufmask.c
@@ -0,0 +1,86 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmask.c
+ *       Routines for buffer masking. Used to mask certain bits
+ *       in a page which can be different when the WAL is generated
+ *       and when the WAL is applied.
+ *
+ * Portions Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * Contains common routines required for masking a page.
+ *
+ * IDENTIFICATION
+ *       src/backend/storage/buffer/bufmask.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/bufmask.h"
+
+/*
+ * mask_page_lsn
+ *
+ * In consistency checks, the LSN of the two pages compared will likely be
+ * different because of concurrent operations when the WAL is generated
+ * and the state of the page when WAL is applied.
+ */
+void
+mask_page_lsn(Page page)
+{
+       PageHeader phdr = (PageHeader) page;
+
+       PageXLogRecPtrSet(phdr->pd_lsn, PG_UINT64_MAX);
+}
+
+/*
+ * mask_page_hint_bits
+ *
+ * Mask hint bits in PageHeader.
+ */
+void
+mask_page_hint_bits(Page page)
+{
+       PageHeader phdr = (PageHeader) page;
+
+       /* Ignore prune_xid (it's like a hint-bit) */
+       phdr->pd_prune_xid = PG_UINT32_MAX;
+
+       /* Ignore PD_PAGE_FULL and PD_HAS_FREE_LINES flags, they are just hints 
*/
+       phdr->pd_flags |= PD_PAGE_FULL | PD_HAS_FREE_LINES;
+
+       /*
+        * Also mask the all-visible flag.
+        *
+        * XXX: It is unfortunate that we have to do this. If the flag is set
+        * incorrectly, that's serious, and we would like to catch it. If the 
flag
+        * is cleared incorrectly, that's serious too. But redo of HEAP_CLEAN
+        * records don't currently set the flag, even though it is set in the
+        * master, so we must silence failures that that causes.
+        */
+       phdr->pd_flags |= PD_ALL_VISIBLE;
+}
+
+/*
+ * mask_unused_space
+ *
+ * Mask the unused space of a page between pd_lower and pd_upper.
+ */
+void
+mask_unused_space(Page page)
+{
+       int pd_lower = ((PageHeader) page)->pd_lower;
+       int pd_upper = ((PageHeader) page)->pd_upper;
+       int pd_special = ((PageHeader) page)->pd_special;
+
+       /* Sanity check */
+       if (pd_lower > pd_upper || pd_special < pd_upper ||
+               pd_lower < SizeOfPageHeaderData || pd_special > BLCKSZ)
+       {
+               elog(ERROR, "invalid page pd_lower %u pd_upper %u pd_special 
%u\n",
+                        pd_lower, pd_upper, pd_special);
+       }
+
+       memset(page + pd_lower, MASK_MARKER, pd_upper - pd_lower);
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 3c695c1..915d24c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,9 +28,11 @@
 
 #include "access/commit_ts.h"
 #include "access/gin.h"
+#include "access/rmgr.h"
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
+#include "access/xlog_internal.h"
 #include "catalog/namespace.h"
 #include "commands/async.h"
 #include "commands/prepare.h"
@@ -145,6 +147,9 @@ static bool call_enum_check_hook(struct config_enum * conf, 
int *newval,
 static bool check_log_destination(char **newval, void **extra, GucSource 
source);
 static void assign_log_destination(const char *newval, void *extra);
 
+static bool check_wal_consistency(char **newval, void **extra, GucSource 
source);
+static void assign_wal_consistency(const char *newval, void *extra);
+
 #ifdef HAVE_SYSLOG
 static int     syslog_facility = LOG_LOCAL0;
 #else
@@ -3254,6 +3259,16 @@ static struct config_string ConfigureNamesString[] =
        },
 
        {
+               {"wal_consistency", PGC_SUSET, WAL_SETTINGS,
+                       gettext_noop("Sets the WAL resource managers for which 
WAL consistency checks are done."),
+                        NULL,
+                       GUC_LIST_INPUT
+               },
+               &wal_consistency_string,
+               "",
+               check_wal_consistency, assign_wal_consistency, NULL
+       },
+       {
                {"log_destination", PGC_SIGHUP, LOGGING_WHERE,
                        gettext_noop("Sets the destination for server log 
output."),
                        gettext_noop("Valid values are combinations of 
\"stderr\", "
@@ -9867,6 +9882,121 @@ call_enum_check_hook(struct config_enum * conf, int 
*newval, void **extra,
  */
 
 static bool
+check_wal_consistency(char **newval, void **extra, GucSource source)
+{
+       char       *rawstring;
+       List       *elemlist;
+       ListCell   *l;
+       bool            newwalconsistency[RM_MAX_ID + 1];
+       bool            isRmgrId = false;       /* Does this guc include any
+                                                                        * 
individual resource manager? */
+       bool            isAll = false;          /* Does this guc include 'all' 
keyword? */
+
+       /* Initialize the array */
+       MemSet(newwalconsistency, 0, (RM_MAX_ID + 1) * sizeof(bool));
+
+       /* Need a modifiable copy of string */
+       rawstring = pstrdup(*newval);
+
+       /* Parse string into list of identifiers */
+       if (!SplitIdentifierString(rawstring, ',', &elemlist))
+       {
+               /* syntax error in list */
+               GUC_check_errdetail("List syntax is invalid.");
+               pfree(rawstring);
+               list_free(elemlist);
+               return false;
+       }
+
+       foreach(l, elemlist)
+       {
+               char       *tok = (char *) lfirst(l);
+               bool            found = false;
+               int                     i;
+
+               /* Check if the token matches with any individual resource 
manager */
+               for (i = 0; i <= RM_MAX_ID; i++)
+               {
+                       if (pg_strcasecmp(tok, RmgrTable[i].rm_name) == 0)
+                       {
+                               /*
+                                * Found a match. Now, check if mask function
+                                * is defined for this resource manager. We'll 
enable this feature
+                                * only for the resource managers for which a 
masking function
+                                * is defined.
+                                */
+                               if (RmgrTable[i].rm_mask != NULL)
+                               {
+                                       newwalconsistency[i] = true;
+                                       found = true;
+                                       isRmgrId = true;
+                                       break;
+                               }
+                               else
+                               {
+                                       GUC_check_errdetail("Unrecognized key 
word: \"%s\".", tok);
+                                       pfree(rawstring);
+                                       list_free(elemlist);
+                                       return false;
+                               }
+                       }
+               }
+
+               /* If a valid resource manager is found, check for the next 
one. */
+               if (found)
+                       continue;
+
+               /* Definitely not an individual resource manager. Check for 
'all'. */
+               if (pg_strcasecmp(tok, "all") == 0)
+               {
+                       /*
+                        * This feature is enabled only for the resource 
managers where
+                        * a masking function is defined.
+                        */
+                       for (i = 0; i <= RM_MAX_ID; i++)
+                       {
+                               if (RmgrTable[i].rm_mask != NULL)
+                               {
+                                       newwalconsistency[i] = true;
+                               }
+                       }
+                       isAll = true;
+               }
+               else
+               {
+                       GUC_check_errdetail("Unrecognized key word: \"%s\".", 
tok);
+                       pfree(rawstring);
+                       list_free(elemlist);
+                       return false;
+               }
+       }
+
+       pfree(rawstring);
+       list_free(elemlist);
+
+       /*
+        * Parameter should contain either 'all' or a combination of resource
+        * managers.
+        */
+       if (isAll && isRmgrId)
+       {
+               GUC_check_errdetail("Invalid value combination");
+               return false;
+       }
+
+       /* assign new value */
+       *extra = guc_malloc(ERROR, (RM_MAX_ID + 1) * sizeof(bool));
+       memcpy(*extra, newwalconsistency, (RM_MAX_ID + 1) * sizeof(bool));
+       return true;
+}
+
+static void
+assign_wal_consistency(const char *newval, void *extra)
+{
+       wal_consistency = (bool *) extra;
+}
+
+static bool
 check_log_destination(char **newval, void **extra, GucSource source)
 {
        char       *rawstring;
diff --git a/src/backend/utils/misc/postgresql.conf.sample 
b/src/backend/utils/misc/postgresql.conf.sample
index 7c2daa5..ca734fe 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -191,6 +191,10 @@
                                        #   open_sync
 #full_page_writes = on                 # recover from partial page writes
 #wal_compression = off                 # enable compression of full-page writes
+#wal_consistency = ''                  # Valid values are combinations of
+                                       # heap2, heap, btree, gin, gist,
+                                       # sequence, spgist and brin. It can also
+                                       # be set to 'all' to enable all the 
values
 #wal_log_hints = off                   # also do full page writes of 
non-critical updates
                                        # (change requires restart)
 #wal_buffers = -1                      # min 32kB, -1 sets based on 
shared_buffers
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 23ac4e7..a170d01 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -29,7 +29,7 @@
  * RmgrNames is an array of resource manager names, to make error messages
  * a bit nicer.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
   name,
 
 static const char *RmgrNames[RM_MAX_ID + 1] = {
diff --git a/src/bin/pg_xlogdump/pg_xlogdump.c 
b/src/bin/pg_xlogdump/pg_xlogdump.c
index d070312..48a3d48 100644
--- a/src/bin/pg_xlogdump/pg_xlogdump.c
+++ b/src/bin/pg_xlogdump/pg_xlogdump.c
@@ -465,7 +465,12 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, 
XLogReaderState *record)
                                           rnode.spcNode, rnode.dbNode, 
rnode.relNode,
                                           blk);
                        if (XLogRecHasBlockImage(record, block_id))
-                               printf(" FPW");
+                       {
+                               if (XLogRecBlockImageApply(record, block_id))
+                                       printf(" FPW (apply)");
+                               else
+                                       printf(" FPW");
+                       }
                }
                putchar('\n');
        }
@@ -489,7 +494,10 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, 
XLogReaderState *record)
                                if (record->blocks[block_id].bimg_info &
                                        BKPIMAGE_IS_COMPRESSED)
                                {
-                                       printf(" (FPW); hole: offset: %u, 
length: %u, compression saved: %u\n",
+                                       printf(" (FPW)%s; hole: offset: %u, 
length: %u, "
+                                               "compression saved: %u\n",
+                                                  
XLogRecBlockImageApply(record, block_id) ?
+                                                               " apply" : "",
                                                   
record->blocks[block_id].hole_offset,
                                                   
record->blocks[block_id].hole_length,
                                                   BLCKSZ -
@@ -498,7 +506,9 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, 
XLogReaderState *record)
                                }
                                else
                                {
-                                       printf(" (FPW); hole: offset: %u, 
length: %u\n",
+                                       printf(" (FPW)%s; hole: offset: %u, 
length: %u\n",
+                                                  
XLogRecBlockImageApply(record, block_id) ?
+                                                               " apply" : "",
                                                   
record->blocks[block_id].hole_offset,
                                                   
record->blocks[block_id].hole_length);
                                }
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index 8fe20ce..5d19a4a 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -32,7 +32,7 @@
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
 
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
        { name, desc, identify},
 
 const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
diff --git a/src/include/access/brin_xlog.h b/src/include/access/brin_xlog.h
index f614805..68192a7 100644
--- a/src/include/access/brin_xlog.h
+++ b/src/include/access/brin_xlog.h
@@ -128,5 +128,6 @@ typedef struct xl_brin_revmap_extend
 extern void brin_redo(XLogReaderState *record);
 extern void brin_desc(StringInfo buf, XLogReaderState *record);
 extern const char *brin_identify(uint8 info);
+extern void brin_mask(char *page, BlockNumber blkno);
 
 #endif   /* BRIN_XLOG_H */
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index e5b2e10..8ec0eeb 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -79,5 +79,6 @@ extern void gin_desc(StringInfo buf, XLogReaderState *record);
 extern const char *gin_identify(uint8 info);
 extern void gin_xlog_startup(void);
 extern void gin_xlog_cleanup(void);
+extern void gin_mask(char *page, BlockNumber blkno);
 
 #endif   /* GIN_H */
diff --git a/src/include/access/gist_private.h 
b/src/include/access/gist_private.h
index 78e87a6..3f8e7b7 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -460,6 +460,7 @@ extern void gist_desc(StringInfo buf, XLogReaderState 
*record);
 extern const char *gist_identify(uint8 info);
 extern void gist_xlog_startup(void);
 extern void gist_xlog_cleanup(void);
+extern void gist_mask(char *page, BlockNumber blkno);
 
 extern XLogRecPtr gistXLogUpdate(Buffer buffer,
                           OffsetNumber *todelete, int ntodelete,
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 06a8242..5cd3022 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -373,6 +373,7 @@ extern void 
HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 extern void heap_redo(XLogReaderState *record);
 extern void heap_desc(StringInfo buf, XLogReaderState *record);
 extern const char *heap_identify(uint8 info);
+extern void heap_mask(char *page, BlockNumber blkno);
 extern void heap2_redo(XLogReaderState *record);
 extern void heap2_desc(StringInfo buf, XLogReaderState *record);
 extern const char *heap2_identify(uint8 info);
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index c580f51..006922a 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -775,5 +775,6 @@ extern void _bt_leafbuild(BTSpool *btspool, BTSpool 
*spool2);
 extern void btree_redo(XLogReaderState *record);
 extern void btree_desc(StringInfo buf, XLogReaderState *record);
 extern const char *btree_identify(uint8 info);
+extern void btree_mask(char *page, BlockNumber blkno);
 
 #endif   /* NBTREE_H */
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index ff7fe62..64b92ff 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
  * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
  * file format.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
        symname,
 
 typedef enum RmgrIds
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index a7a0ae2..89182e2 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -25,25 +25,25 @@
  */
 
 /* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, 
NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, 
NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, 
NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, 
multixact_identify, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, 
NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, 
standby_identify, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, 
NULL)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, 
NULL)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, 
gin_xlog_cleanup)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, 
gist_xlog_startup, gist_xlog_cleanup)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, 
spg_xlog_startup, spg_xlog_cleanup)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, 
commit_ts_identify, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, 
replorigin_desc, replorigin_identify, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, 
generic_identify, NULL, NULL)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, 
logicalmsg_identify, NULL, NULL)
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, 
NULL)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, 
NULL, NULL)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, 
NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, 
NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, 
NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, 
NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, 
multixact_identify, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, 
NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, 
standby_identify, NULL, NULL, NULL)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, 
NULL, heap_mask)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, 
heap_mask)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, 
NULL, btree_mask)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, 
NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, 
gin_xlog_cleanup, gin_mask)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, 
gist_xlog_startup, gist_xlog_cleanup, gist_mask)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, 
seq_mask)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, 
spg_xlog_startup, spg_xlog_cleanup, spg_mask)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, 
brin_mask)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, 
commit_ts_identify, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, 
replorigin_desc, replorigin_identify, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, 
generic_identify, NULL, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, 
logicalmsg_identify, NULL, NULL, NULL)
diff --git a/src/include/access/spgist.h b/src/include/access/spgist.h
index a953a5a..fd6b9f5 100644
--- a/src/include/access/spgist.h
+++ b/src/include/access/spgist.h
@@ -220,5 +220,6 @@ extern void spg_desc(StringInfo buf, XLogReaderState 
*record);
 extern const char *spg_identify(uint8 info);
 extern void spg_xlog_startup(void);
 extern void spg_xlog_cleanup(void);
+extern void spg_mask(char *page, BlockNumber blkno);
 
 #endif   /* SPGIST_H */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index c9f332c..295bf09 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -105,6 +105,8 @@ extern bool EnableHotStandby;
 extern bool fullPageWrites;
 extern bool wal_log_hints;
 extern bool wal_compression;
+extern bool *wal_consistency;
+extern char *wal_consistency_string;
 extern bool log_checkpoints;
 
 extern int     CheckPointSegments;
diff --git a/src/include/access/xlog_internal.h 
b/src/include/access/xlog_internal.h
index ceb0462..57756b8 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -266,6 +266,10 @@ typedef enum
  * "VACUUM". rm_desc can then be called to obtain additional detail for the
  * record, if available (e.g. the last block).
  *
+ * rm_mask uses in input a page associated to the resource manager's records
+ * and performs masking actions on it for consistency check comparisons.
+ * The input must be an already allocated page of size BLCKSZ.
+ *
  * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h).
  */
 typedef struct RmgrData
@@ -276,6 +280,7 @@ typedef struct RmgrData
        const char *(*rm_identify) (uint8 info);
        void            (*rm_startup) (void);
        void            (*rm_cleanup) (void);
+       void            (*rm_mask) (char *page, BlockNumber blkno);
 } RmgrData;
 
 extern const RmgrData RmgrTable[];
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index deaa7f5..697a4ef 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -52,6 +52,7 @@ typedef struct
 
        /* Information on full-page image, if any */
        bool            has_image;
+       bool            apply_image; /* Restore image during WAL replay */
        char       *bkp_image;
        uint16          hole_offset;
        uint16          hole_length;
@@ -205,6 +206,8 @@ extern bool DecodeXLogRecord(XLogReaderState *state, 
XLogRecord *record,
        ((decoder)->blocks[block_id].in_use)
 #define XLogRecHasBlockImage(decoder, block_id) \
        ((decoder)->blocks[block_id].has_image)
+#define XLogRecBlockImageApply(decoder, block_id) \
+       ((decoder)->blocks[block_id].apply_image)
 
 extern bool RestoreBlockImage(XLogReaderState *recoder, uint8 block_id, char 
*dst);
 extern char *XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size 
*len);
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 3dfcb49..972d99d 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -56,8 +56,8 @@ typedef struct XLogRecord
 
 /*
  * The high 4 bits in xl_info may be used freely by rmgr. The
- * XLR_SPECIAL_REL_UPDATE bit can be passed by XLogInsert caller. The rest
- * are set internally by XLogInsert.
+ * XLR_SPECIAL_REL_UPDATE and XLR_CHECK_CONSISTENCY bits can be passed by
+ * XLogInsert caller. The rest are set internally by XLogInsert.
  */
 #define XLR_INFO_MASK                  0x0F
 #define XLR_RMGR_INFO_MASK             0xF0
@@ -71,6 +71,15 @@ typedef struct XLogRecord
 #define XLR_SPECIAL_REL_UPDATE 0x01
 
 /*
+ * Enforces consistency checks of replayed WAL at recovery. If enabled,
+ * each record will log a full-page write for each block modified by the
+ * record and will reuse it afterwards for consistency checks. The caller
+ * of XLogInsert can use this value if necessary, note that if wal_consistency
+ * is enabled this is set unconditionally.
+ */
+#define XLR_CHECK_CONSISTENCY  0x02
+
+/*
  * Header info for block data appended to an XLOG record.
  *
  * 'data_length' is the length of the rmgr-specific payload data associated
@@ -137,6 +146,8 @@ typedef struct XLogRecordBlockImageHeader
 /* Information stored in bimg_info */
 #define BKPIMAGE_HAS_HOLE              0x01    /* page image has "hole" */
 #define BKPIMAGE_IS_COMPRESSED         0x02            /* page image is 
compressed */
+#define BKPIMAGE_APPLY         0x04            /* page image should be restored
+                                                                               
 * during replay */
 
 /*
  * Extra header information used when page image has "hole" and
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 392a626..6fd4130 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -82,5 +82,6 @@ extern void ResetSequenceCaches(void);
 extern void seq_redo(XLogReaderState *rptr);
 extern void seq_desc(StringInfo buf, XLogReaderState *rptr);
 extern const char *seq_identify(uint8 info);
+extern void seq_mask(char *page, BlockNumber blkno);
 
 #endif   /* SEQUENCE_H */
diff --git a/src/include/storage/bufmask.h b/src/include/storage/bufmask.h
new file mode 100644
index 0000000..ab1a93c
--- /dev/null
+++ b/src/include/storage/bufmask.h
@@ -0,0 +1,27 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmask.h
+ *       Definitions for buffer masking routines, used to mask certain bits
+ *       in a page which can be different when the WAL is generated
+ *       and when the WAL is applied. So, we mask those bits before any
+ *       page comparison to make them consistent.
+ *
+ * Portions Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * src/include/storage/bufmask.h
+ */
+
+#ifndef BUFMASK_H
+#define BUFMASK_H
+
+#include "postgres.h"
+#include "storage/block.h"
+#include "storage/bufmgr.h"
+
+/* Marker used to mask pages consistently */
+#define MASK_MARKER            0xFF
+
+extern void mask_page_lsn(Page page);
+extern void mask_page_hint_bits(Page page);
+extern void mask_unused_space(Page page);
+#endif
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to