Hi,

I've added the feature in CP app. Following are the testing details:

1. In master, I've enabled following configurations:

* wal_level = replica
* max_wal_senders = 3
* wal_keep_segments = 4000
* hot_standby = on
* wal_consistency_mask = 511  /* Enable consistency check mask bit*/

2. In slave, I've enabled following configurations:

* standby_mode = on
* wal_consistency_mask = 511 /* Enable consistency check mask bit*/

3. Then, I performed gmake installcheck in master. I didn't get any
warning regarding WAL inconsistency in slave.

I've made following changes to the attached patch:

1. For BRIN pages, I've masked the unused space, PD_PAGE_FULL and
PD_HAS_FREE_LINES flags.
2. For Btree pages, I've masked BTP_HALF_DEAD, BTP_SPLIT_END,
BTP_HAS_GARBAGE and BTP_INCOMPLETE_SPLIT flags.
3. For GIN_DELETED page, I've masked the entire page since the page is
always initialized during recovery.
4. For Speculative Heap tuple insert operation, there was
inconsistency in t_ctid value. So, I've modified the t_ctid value (in
backup page) to current block number and offset number. Need
suggestions!!


What needs to be done:
1. Add support for other Resource Managers.
2. Modify masking techniques for existing Resource Managers (if required).
3. Modify the GUC parameter which will accept a list of rmgr names.
4. Modify the technique for identifying rmgr names for which the
feature should be enabled.
5. Generalize the page type identification technique.


On Wed, Aug 24, 2016 at 2:14 PM, Simon Riggs <si...@2ndquadrant.com> wrote:
> On 22 August 2016 at 16:56, Simon Riggs <si...@2ndquadrant.com> wrote:
>> On 22 August 2016 at 13:44, Kuntal Ghosh <kuntalghosh.2...@gmail.com> wrote:
>>
>>> Please let me know your thoughts on this.
>>
>> Do the regression tests pass with this option enabled?
>
> Hi,
>
> I'd like to be a reviewer on this. Please can you add this onto the CF
> app so we can track the review?
>
> Please supply details of the testing and test coverage.
>
> Thanks
>
> --
> Simon Riggs                http://www.2ndQuadrant.com/
> PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



-- 
Thanks & Regards,
Kuntal Ghosh
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f13f9c1..7b64167 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -25,6 +25,7 @@
 #include "access/commit_ts.h"
 #include "access/multixact.h"
 #include "access/rewriteheap.h"
+#include "access/rmgr.h"
 #include "access/subtrans.h"
 #include "access/timeline.h"
 #include "access/transam.h"
@@ -52,7 +53,9 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/barrier.h"
+#include "storage/bufmask.h"
 #include "storage/bufmgr.h"
+#include "storage/bufpage.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/large_object.h"
@@ -94,6 +97,7 @@ bool		EnableHotStandby = false;
 bool		fullPageWrites = true;
 bool		wal_log_hints = false;
 bool		wal_compression = false;
+int		wal_consistency_mask = 0;
 bool		log_checkpoints = false;
 int			sync_method = DEFAULT_SYNC_METHOD;
 int			wal_level = WAL_LEVEL_MINIMAL;
@@ -867,6 +871,9 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+void checkWALConsistency(XLogReaderState *xlogreader);
+void checkWALConsistencyForBlock(XLogReaderState *record, uint8 block_id);
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -6868,6 +6875,12 @@ StartupXLOG(void)
 				/* Now apply the WAL record itself */
 				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
 
+				/*
+				 * Check whether the page associated with WAL record is consistent
+				 * with the existing page
+				 */
+				checkWALConsistency(xlogreader);
+
 				/* Pop the error context stack */
 				error_context_stack = errcallback.previous;
 
@@ -11626,3 +11639,161 @@ XLogRequestWalReceiverReply(void)
 {
 	doRequestWalReceiverReply = true;
 }
+
+/*
+ * Check whether the page associated with WAL record is consistent with the
+ * existing page or not.
+ */
+void checkWALConsistency(XLogReaderState *xlogreader)
+{
+	RmgrIds rmid = (RmgrIds) XLogRecGetRmid(xlogreader);
+	int block_id;
+	int enableWALConsistencyMask = 1;
+	RmgrIds rmids[] = {RM_HEAP2_ID,RM_HEAP_ID,RM_BTREE_ID,RM_HASH_ID,RM_GIN_ID,RM_GIST_ID,RM_SEQ_ID,RM_SPGIST_ID,RM_BRIN_ID};
+	int size = sizeof(rmids)/sizeof(rmid);
+	int i;
+
+	for (i = 0; i < size; i++)
+	{
+		if (rmids[i]==rmid && (wal_consistency_mask & enableWALConsistencyMask))
+		{
+			for (block_id = 0; block_id <= xlogreader->max_block_id; block_id++)
+				checkWALConsistencyForBlock(xlogreader,block_id);
+			break;
+		}
+		/*
+		 * Enable checking for the next bit
+		 */
+		enableWALConsistencyMask <<= 1;
+	}
+}
+void checkWALConsistencyForBlock(XLogReaderState *record, uint8 block_id)
+{
+	Buffer buf;
+	char *ptr;
+	DecodedBkpBlock *bkpb;
+	char		tmp[BLCKSZ];
+	RelFileNode rnode;
+	ForkNumber	forknum;
+	BlockNumber blkno;
+	Page		page;
+
+	if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
+	{
+		/* Caller specified a bogus block_id. Don't do anything. */
+		return;
+	}
+	buf = XLogReadBufferExtended(rnode, forknum, blkno,
+									   RBM_WAL_CHECK);
+	page = BufferGetPage(buf);
+
+	bkpb = &record->blocks[block_id];
+	if(bkpb->bkp_image!=NULL)
+		ptr = bkpb->bkp_image;
+	else
+	{
+		elog(WARNING,
+				 "No page found in WAL for record %X/%X, rel %u/%u/%u, "
+				 "forknum %u, blkno %u",
+				 (uint32) (record->ReadRecPtr>> 32), (uint32) record->ReadRecPtr ,
+				 rnode.spcNode, rnode.dbNode, rnode.relNode,
+				 forknum, blkno);
+		return;
+	}
+
+	if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
+	{
+		/* If a backup block image is compressed, decompress it */
+		if (pglz_decompress(ptr, bkpb->bimg_len, tmp,
+							BLCKSZ - bkpb->hole_length) < 0)
+		{
+			elog(ERROR, "invalid compressed image at %X/%X, block %d",
+								  (uint32) (record->ReadRecPtr >> 32),
+								  (uint32) record->ReadRecPtr,
+								  block_id);
+		}
+		ptr = tmp;
+	}
+	/*
+	 * If block restores the associated page during WAL replay,
+	 * adjust the block hole accordingly.
+	 */
+	if (bkpb->hole_length == 0)
+	{
+		memcpy(tmp, ptr, BLCKSZ);
+	}
+	else
+	{
+		memcpy(tmp, ptr, bkpb->hole_offset);
+		/* must zero-fill the hole */
+		MemSet(tmp + bkpb->hole_offset, 0, bkpb->hole_length);
+		memcpy(tmp + (bkpb->hole_offset + bkpb->hole_length),
+			ptr + bkpb->hole_offset,
+			BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+	}
+	ptr = tmp;
+	char *norm_new_page, *norm_old_page;
+	char	old_buf[BLCKSZ * 2];
+	char	new_buf[BLCKSZ * 2];
+	int		j = 0;
+	int		i;
+	bool	inconsistent = false;
+
+	/* Mask pages */
+	norm_new_page = mask_page((Page)ptr, blkno);
+	norm_old_page = mask_page((Page)page, blkno);
+	/*
+	 * Convert the pages to be compared into hex format to facilitate
+	 * their comparison and make potential diffs more readable while
+	 * debugging.
+	 */
+	for (i = 0; i < BLCKSZ; i++)
+	{
+		const char *digits = "0123456789ABCDEF";
+		uint8 byte_new = (uint8) norm_new_page[i];
+		uint8 byte_old = (uint8) norm_old_page[i];
+
+		new_buf[j] = digits[byte_new >> 4];
+		old_buf[j] = digits[byte_old >> 4];
+		/*
+		 * Do an inclusive comparison, if the new buffer has a mask
+		 * marker and not the old buffer pages are inconsistent as this
+		 * would mean that the old page has content that the new buffer
+		 * has not.
+		 */
+		if (new_buf[j]!=old_buf[j])
+		{
+			inconsistent = true;
+			break;
+		}
+		j++;
+		new_buf[j] = digits[byte_new & 0x0F];
+		old_buf[j] = digits[byte_old & 0x0F];
+		if (new_buf[j]!=old_buf[j])
+		{
+			inconsistent = true;
+			break;
+		}
+		j++;
+	}
+
+	/* Time to compare the old and new contents */
+	if (inconsistent)
+		elog(WARNING,
+			 "Inconsistent page (at byte %u) found for record %X/%X, rel %u/%u/%u, "
+			 "forknum %u, blkno %u", i,
+			 (uint32) (record->ReadRecPtr>> 32), (uint32) record->ReadRecPtr ,
+			 rnode.spcNode, rnode.dbNode, rnode.relNode,
+			 forknum, blkno);
+	else
+		elog(DEBUG1,
+			 "Consistent page found for record %X/%X, rel %u/%u/%u, "
+			 "forknum %u, blkno %u",
+			 (uint32) (record->ReadRecPtr  >> 32), (uint32) record->ReadRecPtr ,
+			 rnode.spcNode, rnode.dbNode, rnode.relNode,
+			 forknum, blkno);
+
+	pfree(norm_new_page);
+	pfree(norm_old_page);
+	ReleaseBuffer(buf);
+}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index c37003a..af4df2a 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -513,7 +513,12 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		XLogRecordBlockCompressHeader cbimg = {0};
 		bool		samerel;
 		bool		is_compressed = false;
-
+		int		enableWALConsistencyMask = 1;
+		RmgrIds		rmids[] = {RM_HEAP2_ID,RM_HEAP_ID,RM_BTREE_ID,RM_HASH_ID,RM_GIN_ID,RM_GIST_ID,RM_SEQ_ID,RM_SPGIST_ID,RM_BRIN_ID};
+		int		size = sizeof(rmids)/sizeof(rmid);
+		int		i;
+		bool		needs_image_backup; /*Since, we always set needs_backup to true,
+							this field remembers the original value of needs_backup*/
 		if (!regbuf->in_use)
 			continue;
 
@@ -556,6 +561,24 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
 			bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
 
+		/*
+		 * If wal consistency check is enabled for current rmid,
+		 * We do fpw for the current block.
+		 */
+		needs_image_backup = needs_backup;
+		for (i = 0; i < size; i++)
+		{
+			if (rmids[i]==rmid && (wal_consistency_mask & enableWALConsistencyMask))
+			{
+				needs_backup = true;
+				break;
+			}
+			/*
+			 * Enable checking for the next bit
+			 */
+			enableWALConsistencyMask <<= 1;
+		}
+
 		if (needs_backup)
 		{
 			Page		page = regbuf->page;
@@ -618,6 +641,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
 			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
+			if (needs_image_backup)
+				bimg.bimg_info |= BKPIMAGE_IS_REQUIRED;
+
 			if (is_compressed)
 			{
 				bimg.length = compressed_len;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index dcf747c..5e53df3 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1077,11 +1077,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 			}
 			datatotal += blk->data_len;
 
+			blk->require_image=false;
 			if (blk->has_image)
 			{
 				COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
 				COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
+				/*
+				 * If we require the image for any other purpose that wal consistency check
+				 * set require_image flag.
+				 */
+				if(blk->bimg_info & BKPIMAGE_IS_REQUIRED)
+					blk->require_image = true;
 				if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
 				{
 					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
@@ -1222,6 +1229,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 			memcpy(blk->data, ptr, blk->data_len);
 			ptr += blk->data_len;
 		}
+		/*
+		 * If image is inserted in the WAL record for any other purpose than WAL
+		 * consistency check, set has_image=true, else set it to false.
+		 */
+		blk->has_image=blk->require_image;
 	}
 
 	/* and finally, the main data */
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index c98f981..eaf2d8b 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -49,16 +49,6 @@
 #define SEQ_LOG_VALS	32
 
 /*
- * The "special area" of a sequence's buffer page looks like this.
- */
-#define SEQ_MAGIC	  0x1717
-
-typedef struct sequence_magic
-{
-	uint32		magic;
-} sequence_magic;
-
-/*
  * We store a SeqTable item for every sequence we have touched in the current
  * session.  This is needed to hold onto nextval/currval state.  (We can't
  * rely on the relcache, since it's only, well, a cache, and may decide to
@@ -329,7 +319,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 {
 	Buffer		buf;
 	Page		page;
-	sequence_magic *sm;
+	SequencePageOpaqueData *sm;
 	OffsetNumber offnum;
 
 	/* Initialize first page of relation with special magic number */
@@ -339,9 +329,9 @@ fill_seq_with_data(Relation rel, HeapTuple tuple)
 
 	page = BufferGetPage(buf);
 
-	PageInit(page, BufferGetPageSize(buf), sizeof(sequence_magic));
-	sm = (sequence_magic *) PageGetSpecialPointer(page);
-	sm->magic = SEQ_MAGIC;
+	PageInit(page, BufferGetPageSize(buf), sizeof(SequencePageOpaqueData));
+	sm = (SequencePageOpaqueData *) PageGetSpecialPointer(page);
+	sm->seq_page_id = SEQ_MAGIC;
 
 	/* Now insert sequence tuple */
 
@@ -1109,18 +1099,18 @@ read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
 {
 	Page		page;
 	ItemId		lp;
-	sequence_magic *sm;
+	SequencePageOpaqueData *sm;
 	Form_pg_sequence seq;
 
 	*buf = ReadBuffer(rel, 0);
 	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
 
 	page = BufferGetPage(*buf);
-	sm = (sequence_magic *) PageGetSpecialPointer(page);
+	sm = (SequencePageOpaqueData *) PageGetSpecialPointer(page);
 
-	if (sm->magic != SEQ_MAGIC)
+	if (sm->seq_page_id != SEQ_MAGIC)
 		elog(ERROR, "bad magic number in sequence \"%s\": %08X",
-			 RelationGetRelationName(rel), sm->magic);
+			 RelationGetRelationName(rel), sm->seq_page_id);
 
 	lp = PageGetItemId(page, FirstOffsetNumber);
 	Assert(ItemIdIsNormal(lp));
@@ -1585,7 +1575,7 @@ seq_redo(XLogReaderState *record)
 	char	   *item;
 	Size		itemsz;
 	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record);
-	sequence_magic *sm;
+	SequencePageOpaqueData *sm;
 
 	if (info != XLOG_SEQ_LOG)
 		elog(PANIC, "seq_redo: unknown op code %u", info);
@@ -1604,9 +1594,9 @@ seq_redo(XLogReaderState *record)
 	 */
 	localpage = (Page) palloc(BufferGetPageSize(buffer));
 
-	PageInit(localpage, BufferGetPageSize(buffer), sizeof(sequence_magic));
-	sm = (sequence_magic *) PageGetSpecialPointer(localpage);
-	sm->magic = SEQ_MAGIC;
+	PageInit(localpage, BufferGetPageSize(buffer), sizeof(SequencePageOpaqueData));
+	sm = (SequencePageOpaqueData *) PageGetSpecialPointer(localpage);
+	sm->seq_page_id = SEQ_MAGIC;
 
 	item = (char *) xlrec + sizeof(xl_seq_rec);
 	itemsz = XLogRecGetDataLen(record) - sizeof(xl_seq_rec);
diff --git a/src/backend/storage/buffer/Makefile b/src/backend/storage/buffer/Makefile
index 2c10fba..8630dca 100644
--- a/src/backend/storage/buffer/Makefile
+++ b/src/backend/storage/buffer/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/storage/buffer
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = buf_table.o buf_init.o bufmgr.o freelist.o localbuf.o
+OBJS = buf_table.o buf_init.o bufmask.o bufmgr.o freelist.o localbuf.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/buffer/bufmask.c b/src/backend/storage/buffer/bufmask.c
new file mode 100644
index 0000000..d42e3f7
--- /dev/null
+++ b/src/backend/storage/buffer/bufmask.c
@@ -0,0 +1,415 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmask.c
+ *	  Routines for buffer masking, used to ensure that buffers used for
+ *	  comparison across nodes are in a consistent state.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * Most pages cannot be compared directly, because some parts of the
+ * page are not expected to be byte-by-byte identical. For example,
+ * hint bits or unused space in the page. The strategy is to normalize
+ * all pages by creating a mask of those bits that are not expected to
+ * match.
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/buffer/bufmask.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/brin_page.h"
+#include "access/nbtree.h"
+#include "access/gist.h"
+#include "access/gin_private.h"
+#include "access/hash.h"
+#include "access/htup_details.h"
+#include "access/spgist_private.h"
+#include "commands/sequence.h"
+#include "storage/bufmask.h"
+#include "storage/bufmgr.h"
+
+/* Marker used to mask pages consistently */
+#define MASK_MARKER		0xFF
+
+static void mask_unused_space(Page page);
+static void mask_page_lsn(Page page);
+static void mask_heap_page(Page page, BlockNumber blkno);
+static void mask_spgist_page(Page page);
+static void mask_gist_page(Page page);
+static void mask_gin_page(Page page, BlockNumber blkno);
+static void mask_sequence_page(Page page);
+static void mask_btree_page(Page page);
+static void mask_hash_page(Page page);
+static void mask_brin_page(Page page);
+
+/*
+ * Mask the unused space of a page between pd_lower and pd_upper.
+ */
+static void
+mask_unused_space(Page page)
+{
+	int pd_lower = ((PageHeader) page)->pd_lower;
+	int	pd_upper = ((PageHeader) page)->pd_upper;
+	int pd_special = ((PageHeader) page)->pd_special;
+
+	/* Sanity check */
+	if (pd_lower > pd_upper || pd_special < pd_upper ||
+		pd_lower < SizeOfPageHeaderData || pd_special > BLCKSZ)
+	{
+		elog(ERROR, "invalid page at %X/%08X\n",
+			 ((PageHeader) page)->pd_lsn.xlogid,
+			 ((PageHeader) page)->pd_lsn.xrecoff);
+	}
+
+	memset(page + pd_lower, MASK_MARKER, pd_upper - pd_lower);
+}
+/*
+ * Mask Page LSN
+ */
+static void
+mask_page_lsn(Page page)
+{
+
+	PageHeader phdr = (PageHeader) page;
+	PageXLogRecPtrSet(phdr->pd_lsn, 0xFFFFFFFFFFFFFFFF);
+}
+/*
+ * Mask a heap page
+ */
+static void
+mask_heap_page(Page page, BlockNumber blkno)
+{
+	OffsetNumber off;
+	PageHeader phdr = (PageHeader) page;
+
+	mask_unused_space(page);
+
+	/* Ignore prune_xid (it's like a hint-bit) */
+	phdr->pd_prune_xid = 0xFFFFFFFF;
+
+	/* Ignore PD_PAGE_FULL and PD_HAS_FREE_LINES flags, they are just hints */
+	phdr->pd_flags |= PD_PAGE_FULL | PD_HAS_FREE_LINES;
+
+	/*
+	 * Also mask the all-visible flag.
+	 *
+	 * XXX: It is unfortunate that we have to do this. If the flag is set
+	 * incorrectly, that's serious, and we would like to catch it. If the flag
+	 * is cleared incorrectly, that's serious too. But redo of HEAP_CLEAN
+	 * records don't currently set the flag, even though it is set in the
+	 * master, so we must silence failures that that causes.
+	 */
+	phdr->pd_flags |= PD_ALL_VISIBLE;
+
+	for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
+	{
+		ItemId	iid = PageGetItemId(page, off);
+		char   *page_item;
+
+		page_item = (char *) (page + ItemIdGetOffset(iid));
+
+		/*
+		 * Ignore hint bits and command ID.
+		 */
+		if (ItemIdIsNormal(iid))
+		{
+			HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
+
+			page_htup->t_infomask =
+				HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID |
+				HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID;
+			page_htup->t_infomask |= HEAP_XACT_MASK;
+			page_htup->t_choice.t_heap.t_field3.t_cid = 0xFFFFFFFF;
+
+			/*
+			 * For a speculative tuple, the content of t_ctid is conflicting
+			 * between the backup page and current page. Hence, I set it
+			 * to current block number and offset. Need suggestions!
+			 */
+			if (HeapTupleHeaderIsSpeculative(page_htup))
+			{
+				ItemPointerSet(&page_htup->t_ctid, blkno, off);
+			}
+		}
+
+		/*
+		 * Ignore any padding bytes after the tuple, when the length of
+		 * the item is not MAXALIGNed.
+		 */
+		if (ItemIdHasStorage(iid))
+		{
+			int len = ItemIdGetLength(iid);
+			int padlen = MAXALIGN(len) - len;
+
+			if (padlen > 0)
+				memset(page_item + len, MASK_MARKER, padlen);
+		}
+	}
+}
+
+/*
+ * Mask a SpGist page
+ */
+static void
+mask_spgist_page(Page page)
+{
+	mask_unused_space(page);
+}
+
+/*
+ * Mask a GIST page
+ */
+static void
+mask_gist_page(Page page)
+{
+	mask_unused_space(page);
+
+	/*Mask NSN*/
+	GistPageSetNSN(page, 0xFFFFFFFFFFFFFFFF);
+
+	/* Mask flag bits of a gist page*/
+	GistPageSetDeleted(page);
+	GistMarkTuplesDeleted(page);
+	GistMarkPageHasGarbage(page);
+	GistMarkFollowRight(page);
+}
+/*
+ * Mask a Gin page
+ */
+static void
+mask_gin_page(Page page, BlockNumber blkno)
+{
+	GinPageOpaque opaque = GinPageGetOpaque(page);
+
+	/* GIN metapage doesn't use pd_lower/pd_upper. Other page types do. */
+	if (blkno != 0)
+	{
+		/*
+		 * For GIN_DELETED page, the page is initialized to empty.
+		 * Hence mask everything.
+		 */
+		if (opaque->flags & GIN_DELETED)
+			memset(page, MASK_MARKER, BLCKSZ);
+		else
+			mask_unused_space(page);
+	}
+}
+
+/*
+ * Mask a sequence page
+ */
+static void
+mask_sequence_page(Page page)
+{
+	/*
+	 * FIXME: currently, we just ignore sequence records altogether. nextval
+	 * records a different value in the WAL record than it writes to the
+	 * buffer. Ideally we would only mask out the value in the tuple.
+	 */
+	memset(page, MASK_MARKER, BLCKSZ);
+}
+
+/*
+ * Mask a btree page
+ */
+static void
+mask_btree_page(Page page)
+{
+	OffsetNumber off;
+	OffsetNumber maxoff;
+	BTPageOpaque maskopaq = (BTPageOpaque)
+			(((char *) page) + ((PageHeader) page)->pd_special);
+
+	/*
+	 * Mark unused space before any processing. This is important as it
+	 * uses pd_lower and pd_upper that may be masked on this page
+	 * afterwards if it is a deleted page.
+	 */
+	mask_unused_space(page);
+
+	/*
+	 * Mask everything on a DELETED page.
+	 */
+	if (((BTPageOpaque) PageGetSpecialPointer(page))->btpo_flags & BTP_DELETED)
+	{
+		/* Page content, between standard page header and opaque struct */
+		memset(page + SizeOfPageHeaderData, MASK_MARKER,
+			   BLCKSZ - MAXALIGN(sizeof(BTPageOpaqueData)));
+
+		/* pd_lower and upper */
+		memset(&((PageHeader) page)->pd_lower, MASK_MARKER, sizeof(uint16));
+		memset(&((PageHeader) page)->pd_upper, MASK_MARKER, sizeof(uint16));
+	}
+	else
+	{
+		/*
+		 * Mask some line pointer bits, particularly those marked as
+		 * used on a master and unused on a standby.
+		 * XXX: This could be refined.
+		 */
+		maxoff = PageGetMaxOffsetNumber(page);
+		for (off = 1; off <= maxoff; off++)
+		{
+			ItemId iid = PageGetItemId(page, off);
+
+			if (ItemIdIsUsed(iid))
+				iid->lp_flags = LP_UNUSED;
+		}
+	}
+
+	/*
+	 * Mask BTP_HALF_DEAD, BTP_SPLIT_END,
+	 * BTP_HAS_GARBAGE, BTP_INCOMPLETE_SPLIT flags
+	 */
+	maskopaq->btpo_flags |= 0xF0;
+}
+
+static void
+mask_hash_page(Page page)
+{
+	OffsetNumber off;
+	OffsetNumber maxoff;
+	HashPageOpaque opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+	/*
+	 * Mark unused space before any processing. This is important as it
+	 * uses pd_lower and pd_upper that may be masked on this page
+	 * afterwards if it is a deleted page.
+	 */
+	mask_unused_space(page);
+	/*
+	 * Mask everything on a UNUSED page.
+	 */
+	if (opaque->hasho_flag & LH_UNUSED_PAGE)
+	{
+		/* Page content, between standard page header and opaque struct */
+		memset(page + SizeOfPageHeaderData, MASK_MARKER,
+			   BLCKSZ - MAXALIGN(sizeof(HashPageOpaqueData)));
+
+		/* pd_lower and upper */
+		memset(&((PageHeader) page)->pd_lower, MASK_MARKER, sizeof(uint16));
+		memset(&((PageHeader) page)->pd_upper, MASK_MARKER, sizeof(uint16));
+	}
+	else if ((opaque->hasho_flag & LH_META_PAGE)==0)
+	{
+		/*
+		 * For pages other than metapage,
+		 * Mask some line pointer bits, particularly those marked as
+		 * used on a master and unused on a standby.
+		 * XXX: This could be refined.
+		 */
+		maxoff = PageGetMaxOffsetNumber(page);
+		for (off = 1; off <= maxoff; off++)
+		{
+			ItemId iid = PageGetItemId(page, off);
+
+			if (ItemIdIsUsed(iid))
+				iid->lp_flags = LP_UNUSED;
+		}
+	}
+}
+
+/*
+ * Mask a BRIN page
+ */
+static void
+mask_brin_page(Page page)
+{
+	PageHeader phdr = (PageHeader) page;
+
+	/* Ignore PD_PAGE_FULL and PD_HAS_FREE_LINES flags, they are just hints */
+	phdr->pd_flags |= PD_PAGE_FULL | PD_HAS_FREE_LINES;
+
+	mask_unused_space(page);
+}
+
+/*
+ * mask_page
+ *
+ * Mask a given page. First try to find what kind of page it is
+ * and then normalize it. This function returns a normalized page
+ * palloc'ed. So caller should free the normalized page correctly when
+ * using this function. Tracking blkno is needed for gin pages as their
+ * metapage does not use pd_lower and pd_upper.
+ * Before calling this function, it is assumed that caller has already
+ * taken a proper lock on the page being masked.
+ */
+char *
+mask_page(const char *page, BlockNumber blkno)
+{
+	Page	page_norm;
+	uint16	tail;
+
+	page_norm = (Page) palloc(BLCKSZ);
+	memcpy(page_norm, page, BLCKSZ);
+	/*
+	 * Mask the Page LSN. Because, we store the page before updating the LSN.
+	 * Hence, LSNs of both pages will always be different.
+	 */
+	mask_page_lsn(page_norm);
+	/*
+	 * Look at the size of the special area, and the last two bytes in
+	 * it, to detect what kind of a page it is. Then call the appropriate
+	 * masking function.
+	 */
+	memcpy(&tail, &page[BLCKSZ - 2], 2);
+	if (PageGetSpecialSize(page) == 0)
+	{
+		/* Case of a normal relation, it has an empty special area */
+		mask_heap_page(page_norm, blkno);
+	}
+	else if (PageGetSpecialSize(page) == MAXALIGN(sizeof(GISTPageOpaqueData)) &&
+			 tail == GIST_PAGE_ID)
+	{
+		/* Gist page */
+		mask_gist_page(page_norm);
+	}
+	else if (PageGetSpecialSize(page) == MAXALIGN(sizeof(BTPageOpaqueData)) &&
+			 tail <= MAX_BT_CYCLE_ID)
+	{
+		/* btree page */
+		mask_btree_page(page_norm);
+	}
+	else if (PageGetSpecialSize(page) == MAXALIGN(sizeof(SpGistPageOpaqueData)) &&
+			 tail == SPGIST_PAGE_ID)
+	{
+		/* SpGist page */
+		mask_spgist_page(page_norm);
+	}
+	else if(PageGetSpecialSize(page) == MAXALIGN(sizeof(HashPageOpaqueData)) &&
+			tail == HASHO_PAGE_ID)
+	{
+		mask_hash_page(page_norm);
+	}
+	else if(BRIN_IS_META_PAGE(page) || BRIN_IS_REVMAP_PAGE(page) || BRIN_IS_REGULAR_PAGE(page))
+	{
+		mask_brin_page(page_norm);
+	}
+	else if (PageGetSpecialSize(page) == MAXALIGN(sizeof(GinPageOpaqueData)) ||
+			 PageGetSpecialSize(page) == MAXALIGN(sizeof(SequencePageOpaqueData)))
+	{
+		/*
+		 * The page found here is used either for a Gin index or a sequence.
+		 * Gin index pages do not have a proper identifier, so check if the page
+		 * is used by a sequence or not. If it is not the case, this page is used
+		 * by a gin index. It is still possible that a gin page covers with area
+		 * with exactly the same value as SEQ_MAGIC, but this is unlikely to happen.
+		 */
+		if (((SequencePageOpaqueData *) PageGetSpecialPointer(page))->seq_page_id == SEQ_MAGIC)
+			mask_sequence_page(page_norm);
+		else
+			mask_gin_page(page_norm, blkno);
+	}
+	else
+	{
+		/* Should not come here*/
+		Assert(0);
+	}
+
+	/* Return normalized page */
+	return (char *) page_norm;
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6ac5184..645a807 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1800,6 +1800,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"wal_consistency_mask", PGC_POSTMASTER, WAL_SETTINGS,
+			gettext_noop("Mask to enable WAL consistency for HEAP_INSERT/HEAP_INSERT2."),
+			NULL
+		},
+		&wal_consistency_mask,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY,
 			gettext_noop("Sets the maximum wait time to receive data from the primary."),
 			NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 6d0666c..e7e21ed 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -191,6 +191,17 @@
 					#   open_sync
 #full_page_writes = on			# recover from partial page writes
 #wal_compression = off			# enable compression of full-page writes
+#wal_consistency_mask = 0		# enable WAL consistency check for different operations
+					# bit 0 - HEAP2
+					# bit 1 - HEAP
+					# bit 2 - BTREE
+					# bit 3 - HASH
+					# bit 4 - GIN
+					# bit 5 - GIST
+					# bit 6 - SEQ
+					# bit 7 - SPGIST
+					# bit 8 - BRIN
+					# Multiple bits can also be enabled. For example, to enable HEAP and HASH, set the value to 10
 #wal_log_hints = off			# also do full page writes of non-critical updates
 					# (change requires restart)
 #wal_buffers = -1			# min 32kB, -1 sets based on shared_buffers
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 14b7f7f..1fc5f6e 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -104,6 +104,7 @@ extern bool EnableHotStandby;
 extern bool fullPageWrites;
 extern bool wal_log_hints;
 extern bool wal_compression;
+extern int wal_consistency_mask;
 extern bool log_checkpoints;
 
 extern int	CheckPointSegments;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index deaa7f5..287143b 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -52,6 +52,8 @@ typedef struct
 
 	/* Information on full-page image, if any */
 	bool		has_image;
+	bool		require_image; /* This field contains the true value of has_image.
+					Because, if wal consistency check is enabled, has_image will always be true.*/
 	char	   *bkp_image;
 	uint16		hole_offset;
 	uint16		hole_length;
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 3dfcb49..34e28c0 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -137,7 +137,7 @@ typedef struct XLogRecordBlockImageHeader
 /* Information stored in bimg_info */
 #define BKPIMAGE_HAS_HOLE		0x01	/* page image has "hole" */
 #define BKPIMAGE_IS_COMPRESSED		0x02		/* page image is compressed */
-
+#define BKPIMAGE_IS_REQUIRED		0x04	/* page is required by the WAL record */
 /*
  * Extra header information used when page image has "hole" and
  * is compressed.
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 6af60d8..a7a0e16 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -20,6 +20,19 @@
 #include "nodes/parsenodes.h"
 #include "storage/relfilenode.h"
 
+/*
+ * Page opaque data in a sequence page
+ */
+typedef struct SequencePageOpaqueData
+{
+	uint32 seq_page_id;
+} SequencePageOpaqueData;
+
+/*
+ * This page ID is for the conveniende to be able to identify if a page
+ * is being used by a sequence.
+ */
+#define SEQ_MAGIC		0x1717
 
 typedef struct FormData_pg_sequence
 {
diff --git a/src/include/storage/bufmask.h b/src/include/storage/bufmask.h
new file mode 100644
index 0000000..1dd5a67
--- /dev/null
+++ b/src/include/storage/bufmask.h
@@ -0,0 +1,21 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmask.h
+ *       Buffer masking definitions.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/bufmask.h
+ */
+
+#ifndef BUFMASK_H
+#define BUFMASK_H
+
+#include "postgres.h"
+#include "storage/block.h"
+
+/* Entry point for page masking */
+extern char *mask_page(const char *page, BlockNumber blkno);
+
+#endif
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 3d5dea7..6621a39 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -44,8 +44,9 @@ typedef enum
 	RBM_ZERO_AND_CLEANUP_LOCK,	/* Like RBM_ZERO_AND_LOCK, but locks the page
 								 * in "cleanup" mode */
 	RBM_ZERO_ON_ERROR,			/* Read, but return an all-zeros page on error */
-	RBM_NORMAL_NO_LOG			/* Don't log page as invalid during WAL
+	RBM_NORMAL_NO_LOG,			/* Don't log page as invalid during WAL
 								 * replay; otherwise same as RBM_NORMAL */
+	RBM_WAL_CHECK		/*Normal read, but don't check whether the page is new or not. */
 } ReadBufferMode;
 
 /* forward declared, to avoid having to expose buf_internals.h here */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to