Hi, I've attached a patch to check if the current page is equal with the FPW after applying WAL on it. This is how the patch works:
1. When a WAL record is inserted, a FPW is done for that operation. But, a flag is kept to indicate whether that page needs to be restored. 2. During recovery, when a redo operation is done, we do a comparison with the FPW contained in the WAL record with the current page in the buffer. For this purpose, I've used Michael's patch with minor changes to check whether two pages are actually equal or not. 3. I've also added a guc variable (wal_consistency_mask) to indicate the operations (HEAP,BTREE,HASH,GIN etc) for which this feature (always FPW and consistency check) is to be enabled. How to use the patch: 1. Apply the patch. 2. In postgresql.conf file, set wal_consistency_mask variable accordingly. For debug messages, set log_min_messages = debug1. Michael's patch: https://www.postgresql.org/message-id/CAB7nPqR4vxdKijP%2BDu82vOcOnGMvutq-gfqiU2dsH4bsM77hYg%40mail.gmail.com Reference thread: https://www.postgresql.org/message-id/flat/CAB7nPqR4vxdKijP%2BDu82vOcOnGMvutq-gfqiU2dsH4bsM77hYg%40mail.gmail.com#cab7npqr4vxdkijp+du82vocongmvutq-gfqiu2dsh4bsm77...@mail.gmail.com Please let me know your thoughts on this. -- Thanks & Regards, Kuntal Ghosh EnterpriseDB: http://www.enterprisedb.com
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f13f9c1..9380079 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -25,6 +25,7 @@ #include "access/commit_ts.h" #include "access/multixact.h" #include "access/rewriteheap.h" +#include "access/rmgr.h" #include "access/subtrans.h" #include "access/timeline.h" #include "access/transam.h" @@ -52,7 +53,9 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/barrier.h" +#include "storage/bufmask.h" #include "storage/bufmgr.h" +#include "storage/bufpage.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/large_object.h" @@ -94,6 +97,7 @@ bool EnableHotStandby = false; bool fullPageWrites = true; bool wal_log_hints = false; bool wal_compression = false; +int wal_consistency_mask = 0; bool log_checkpoints = false; int sync_method = DEFAULT_SYNC_METHOD; int wal_level = WAL_LEVEL_MINIMAL; @@ -867,6 +871,9 @@ static void WALInsertLockAcquireExclusive(void); static void WALInsertLockRelease(void); static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); +void checkWALConsistency(XLogReaderState *xlogreader); +void checkWALConsistencyForBlock(XLogReaderState *record, uint8 block_id); + /* * Insert an XLOG record represented by an already-constructed chain of data * chunks. This is a low-level routine; to construct the WAL record header @@ -6868,6 +6875,12 @@ StartupXLOG(void) /* Now apply the WAL record itself */ RmgrTable[record->xl_rmid].rm_redo(xlogreader); + /* + * Check whether the page associated with WAL record is consistent + * with the existing page + */ + checkWALConsistency(xlogreader); + /* Pop the error context stack */ error_context_stack = errcallback.previous; @@ -11626,3 +11639,160 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Check whether the page associated with WAL record is consistent with the + * existing page or not. + */ +void checkWALConsistency(XLogReaderState *xlogreader) +{ + RmgrIds rmid = (RmgrIds) XLogRecGetRmid(xlogreader); + int block_id; + int enableWALConsistencyMask = 1; + RmgrIds rmids[] = {RM_HEAP2_ID,RM_HEAP_ID,RM_BTREE_ID,RM_HASH_ID,RM_GIN_ID,RM_GIST_ID,RM_SEQ_ID,RM_SPGIST_ID,RM_BRIN_ID}; + int size = sizeof(rmids)/sizeof(rmid); + int i; + for(i=0;i<size;i++) + { + if(rmids[i]==rmid && (wal_consistency_mask & enableWALConsistencyMask)) + { + for (block_id = 0; block_id <= xlogreader->max_block_id; block_id++) + checkWALConsistencyForBlock(xlogreader,block_id); + break; + } + /* + * Enable checking for the next bit + */ + enableWALConsistencyMask <<= 1; + } +} +void checkWALConsistencyForBlock(XLogReaderState *record, uint8 block_id) +{ + Buffer buf; + char *ptr; + DecodedBkpBlock *bkpb; + char tmp[BLCKSZ]; + RelFileNode rnode; + ForkNumber forknum; + BlockNumber blkno; + Page page; + + if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno)) + { + /* Caller specified a bogus block_id. Don't do anything. */ + return; + } + buf = XLogReadBufferExtended(rnode, forknum, blkno, + RBM_WAL_CHECK); + page = BufferGetPage(buf); + + bkpb = &record->blocks[block_id]; + if(bkpb->bkp_image!=NULL) + ptr = bkpb->bkp_image; + else + { + elog(WARNING, + "No page found in WAL for record %X/%X, rel %u/%u/%u, " + "forknum %u, blkno %u", + (uint32) (record->ReadRecPtr>> 32), (uint32) record->ReadRecPtr , + rnode.spcNode, rnode.dbNode, rnode.relNode, + forknum, blkno); + return; + } + + if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED) + { + /* If a backup block image is compressed, decompress it */ + if (pglz_decompress(ptr, bkpb->bimg_len, tmp, + BLCKSZ - bkpb->hole_length) < 0) + { + elog(ERROR, "invalid compressed image at %X/%X, block %d", + (uint32) (record->ReadRecPtr >> 32), + (uint32) record->ReadRecPtr, + block_id); + } + ptr = tmp; + } + /* + * If block restores the associated page during WAL replay, + * adjust the block hole accordingly. + */ + if (bkpb->hole_length == 0) + { + memcpy(tmp, ptr, BLCKSZ); + } + else + { + memcpy(tmp, ptr, bkpb->hole_offset); + /* must zero-fill the hole */ + MemSet(tmp + bkpb->hole_offset, 0, bkpb->hole_length); + memcpy(tmp + (bkpb->hole_offset + bkpb->hole_length), + ptr + bkpb->hole_offset, + BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); + } + ptr = tmp; + char *norm_new_page, *norm_old_page; + char old_buf[BLCKSZ * 2]; + char new_buf[BLCKSZ * 2]; + int j = 0; + int i; + bool inconsistent = false; + + /* Mask pages */ + norm_new_page = mask_page((Page)ptr, blkno); + norm_old_page = mask_page((Page)page, blkno); + /* + * Convert the pages to be compared into hex format to facilitate + * their comparison and make potential diffs more readable while + * debugging. + */ + for (i = 0; i < BLCKSZ; i++) + { + const char *digits = "0123456789ABCDEF"; + uint8 byte_new = (uint8) norm_new_page[i]; + uint8 byte_old = (uint8) norm_old_page[i]; + + new_buf[j] = digits[byte_new >> 4]; + old_buf[j] = digits[byte_old >> 4]; + /* + * Do an inclusive comparison, if the new buffer has a mask + * marker and not the old buffer pages are inconsistent as this + * would mean that the old page has content that the new buffer + * has not. + */ + if (new_buf[j]!=old_buf[j]) + { + inconsistent = true; + break; + } + j++; + new_buf[j] = digits[byte_new & 0x0F]; + old_buf[j] = digits[byte_old & 0x0F]; + if (new_buf[j]!=old_buf[j]) + { + inconsistent = true; + break; + } + j++; + } + + /* Time to compare the old and new contents */ + if (inconsistent) + elog(WARNING, + "Inconsistent page (at byte %u) found for record %X/%X, rel %u/%u/%u, " + "forknum %u, blkno %u", i, + (uint32) (record->ReadRecPtr>> 32), (uint32) record->ReadRecPtr , + rnode.spcNode, rnode.dbNode, rnode.relNode, + forknum, blkno); + else + elog(DEBUG1, + "Consistent page found for record %X/%X, rel %u/%u/%u, " + "forknum %u, blkno %u", + (uint32) (record->ReadRecPtr >> 32), (uint32) record->ReadRecPtr , + rnode.spcNode, rnode.dbNode, rnode.relNode, + forknum, blkno); + + pfree(norm_new_page); + pfree(norm_old_page); + ReleaseBuffer(buf); +} diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index c37003a..5ff41a3 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -513,7 +513,12 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecordBlockCompressHeader cbimg = {0}; bool samerel; bool is_compressed = false; - + int enableWALConsistencyMask = 1; + RmgrIds rmids[] = {RM_HEAP2_ID,RM_HEAP_ID,RM_BTREE_ID,RM_HASH_ID,RM_GIN_ID,RM_GIST_ID,RM_SEQ_ID,RM_SPGIST_ID,RM_BRIN_ID}; + int size = sizeof(rmids)/sizeof(rmid); + int i; + bool needs_image_backup; /*Since, we always set needs_backup to true, + this field remembers the original value of needs_backup*/ if (!regbuf->in_use) continue; @@ -556,6 +561,24 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT) bkpb.fork_flags |= BKPBLOCK_WILL_INIT; + /* + * If wal consistency check is enabled for current rmid, + * We do fpw for the current block. + */ + needs_image_backup = needs_backup; + for(i=0;i<size;i++) + { + if(rmids[i]==rmid && (wal_consistency_mask & enableWALConsistencyMask)) + { + needs_backup = true; + break; + } + /* + * Enable checking for the next bit + */ + enableWALConsistencyMask <<= 1; + } + if (needs_backup) { Page page = regbuf->page; @@ -618,6 +641,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE; + if (needs_image_backup) + bimg.bimg_info |= BKPIMAGE_IS_REQUIRED; + if (is_compressed) { bimg.length = compressed_len; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index dcf747c..5e53df3 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1077,11 +1077,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) } datatotal += blk->data_len; + blk->require_image=false; if (blk->has_image) { COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16)); COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16)); COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8)); + /* + * If we require the image for any other purpose that wal consistency check + * set require_image flag. + */ + if(blk->bimg_info & BKPIMAGE_IS_REQUIRED) + blk->require_image = true; if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED) { if (blk->bimg_info & BKPIMAGE_HAS_HOLE) @@ -1222,6 +1229,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) memcpy(blk->data, ptr, blk->data_len); ptr += blk->data_len; } + /* + * If image is inserted in the WAL record for any other purpose than WAL + * consistency check, set has_image=true, else set it to false. + */ + blk->has_image=blk->require_image; } /* and finally, the main data */ diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index c98f981..eaf2d8b 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -49,16 +49,6 @@ #define SEQ_LOG_VALS 32 /* - * The "special area" of a sequence's buffer page looks like this. - */ -#define SEQ_MAGIC 0x1717 - -typedef struct sequence_magic -{ - uint32 magic; -} sequence_magic; - -/* * We store a SeqTable item for every sequence we have touched in the current * session. This is needed to hold onto nextval/currval state. (We can't * rely on the relcache, since it's only, well, a cache, and may decide to @@ -329,7 +319,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple) { Buffer buf; Page page; - sequence_magic *sm; + SequencePageOpaqueData *sm; OffsetNumber offnum; /* Initialize first page of relation with special magic number */ @@ -339,9 +329,9 @@ fill_seq_with_data(Relation rel, HeapTuple tuple) page = BufferGetPage(buf); - PageInit(page, BufferGetPageSize(buf), sizeof(sequence_magic)); - sm = (sequence_magic *) PageGetSpecialPointer(page); - sm->magic = SEQ_MAGIC; + PageInit(page, BufferGetPageSize(buf), sizeof(SequencePageOpaqueData)); + sm = (SequencePageOpaqueData *) PageGetSpecialPointer(page); + sm->seq_page_id = SEQ_MAGIC; /* Now insert sequence tuple */ @@ -1109,18 +1099,18 @@ read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple) { Page page; ItemId lp; - sequence_magic *sm; + SequencePageOpaqueData *sm; Form_pg_sequence seq; *buf = ReadBuffer(rel, 0); LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(*buf); - sm = (sequence_magic *) PageGetSpecialPointer(page); + sm = (SequencePageOpaqueData *) PageGetSpecialPointer(page); - if (sm->magic != SEQ_MAGIC) + if (sm->seq_page_id != SEQ_MAGIC) elog(ERROR, "bad magic number in sequence \"%s\": %08X", - RelationGetRelationName(rel), sm->magic); + RelationGetRelationName(rel), sm->seq_page_id); lp = PageGetItemId(page, FirstOffsetNumber); Assert(ItemIdIsNormal(lp)); @@ -1585,7 +1575,7 @@ seq_redo(XLogReaderState *record) char *item; Size itemsz; xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(record); - sequence_magic *sm; + SequencePageOpaqueData *sm; if (info != XLOG_SEQ_LOG) elog(PANIC, "seq_redo: unknown op code %u", info); @@ -1604,9 +1594,9 @@ seq_redo(XLogReaderState *record) */ localpage = (Page) palloc(BufferGetPageSize(buffer)); - PageInit(localpage, BufferGetPageSize(buffer), sizeof(sequence_magic)); - sm = (sequence_magic *) PageGetSpecialPointer(localpage); - sm->magic = SEQ_MAGIC; + PageInit(localpage, BufferGetPageSize(buffer), sizeof(SequencePageOpaqueData)); + sm = (SequencePageOpaqueData *) PageGetSpecialPointer(localpage); + sm->seq_page_id = SEQ_MAGIC; item = (char *) xlrec + sizeof(xl_seq_rec); itemsz = XLogRecGetDataLen(record) - sizeof(xl_seq_rec); diff --git a/src/backend/storage/buffer/Makefile b/src/backend/storage/buffer/Makefile index 2c10fba..8630dca 100644 --- a/src/backend/storage/buffer/Makefile +++ b/src/backend/storage/buffer/Makefile @@ -12,6 +12,6 @@ subdir = src/backend/storage/buffer top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = buf_table.o buf_init.o bufmgr.o freelist.o localbuf.o +OBJS = buf_table.o buf_init.o bufmask.o bufmgr.o freelist.o localbuf.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/buffer/bufmask.c b/src/backend/storage/buffer/bufmask.c new file mode 100644 index 0000000..99c0e15 --- /dev/null +++ b/src/backend/storage/buffer/bufmask.c @@ -0,0 +1,372 @@ +/*------------------------------------------------------------------------- + * + * bufmask.c + * Routines for buffer masking, used to ensure that buffers used for + * comparison across nodes are in a consistent state. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * Most pages cannot be compared directly, because some parts of the + * page are not expected to be byte-by-byte identical. For example, + * hint bits or unused space in the page. The strategy is to normalize + * all pages by creating a mask of those bits that are not expected to + * match. + * + * IDENTIFICATION + * src/backend/storage/buffer/bufmask.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/nbtree.h" +#include "access/gist.h" +#include "access/gin_private.h" +#include "access/hash.h" +#include "access/htup_details.h" +#include "access/spgist_private.h" +#include "commands/sequence.h" +#include "storage/bufmask.h" +#include "storage/bufmgr.h" + +/* Marker used to mask pages consistently */ +#define MASK_MARKER 0xFF + +static void mask_unused_space(Page page); +static void mask_page_lsn(Page page); +static void mask_heap_page(Page page); +static void mask_spgist_page(Page page); +static void mask_gist_page(Page page); +static void mask_gin_page(Page page, BlockNumber blkno); +static void mask_sequence_page(Page page); +static void mask_btree_page(Page page); +static void mask_hash_page(Page page); + +/* + * Mask the unused space of a page between pd_lower and pd_upper. + */ +static void +mask_unused_space(Page page) +{ + int pd_lower = ((PageHeader) page)->pd_lower; + int pd_upper = ((PageHeader) page)->pd_upper; + int pd_special = ((PageHeader) page)->pd_special; + + /* Sanity check */ + if (pd_lower > pd_upper || pd_special < pd_upper || + pd_lower < SizeOfPageHeaderData || pd_special > BLCKSZ) + { + elog(ERROR, "invalid page at %X/%08X\n", + ((PageHeader) page)->pd_lsn.xlogid, + ((PageHeader) page)->pd_lsn.xrecoff); + } + + memset(page + pd_lower, MASK_MARKER, pd_upper - pd_lower); +} +/* + * Mask Page LSN + */ +static void +mask_page_lsn(Page page) +{ + + PageHeader phdr = (PageHeader) page; + PageXLogRecPtrSet(phdr->pd_lsn,0); +} +/* + * Mask a heap page + */ +static void +mask_heap_page(Page page) +{ + OffsetNumber off; + PageHeader phdr = (PageHeader) page; + + mask_unused_space(page); + + /* Ignore prune_xid (it's like a hint-bit) */ + phdr->pd_prune_xid = 0xFFFFFFFF; + + /* Ignore PD_PAGE_FULL and PD_HAS_FREE_LINES flags, they are just hints */ + phdr->pd_flags |= PD_PAGE_FULL | PD_HAS_FREE_LINES; + + /* + * Also mask the all-visible flag. + * + * XXX: It is unfortunate that we have to do this. If the flag is set + * incorrectly, that's serious, and we would like to catch it. If the flag + * is cleared incorrectly, that's serious too. But redo of HEAP_CLEAN + * records don't currently set the flag, even though it is set in the + * master, so we must silence failures that that causes. + */ + phdr->pd_flags |= PD_ALL_VISIBLE; + + for (off = 1; off <= PageGetMaxOffsetNumber(page); off++) + { + ItemId iid = PageGetItemId(page, off); + char *page_item; + + page_item = (char *) (page + ItemIdGetOffset(iid)); + + /* + * Ignore hint bits and command ID. + */ + if (ItemIdIsNormal(iid)) + { + HeapTupleHeader page_htup = (HeapTupleHeader) page_item; + + page_htup->t_infomask = + HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | + HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID; + page_htup->t_infomask |= HEAP_COMBOCID; + page_htup->t_choice.t_heap.t_field3.t_cid = 0xFFFFFFFF; + } + + /* + * Ignore any padding bytes after the tuple, when the length of + * the item is not MAXALIGNed. + */ + if (ItemIdHasStorage(iid)) + { + int len = ItemIdGetLength(iid); + int padlen = MAXALIGN(len) - len; + + if (padlen > 0) + memset(page_item + len, MASK_MARKER, padlen); + } + } +} + +/* + * Mask a SpGist page + */ +static void +mask_spgist_page(Page page) +{ + mask_unused_space(page); +} + +/* + * Mask a GIST page + */ +static void +mask_gist_page(Page page) +{ + mask_unused_space(page); + + /*Mask NSN*/ + GistPageSetNSN(page, 0); + /* Mask flag bits of a gist page*/ + GistPageSetDeleted(page); + GistMarkTuplesDeleted(page); + GistMarkPageHasGarbage(page); + GistMarkFollowRight(page); +} +/* + * Mask a Gin page + */ +static void +mask_gin_page(Page page, BlockNumber blkno) +{ + /* GIN metapage doesn't use pd_lower/pd_upper. Other page types do. */ + if (blkno != 0) + mask_unused_space(page); +} + +/* + * Mask a sequence page + */ +static void +mask_sequence_page(Page page) +{ + /* + * FIXME: currently, we just ignore sequence records altogether. nextval + * records a different value in the WAL record than it writes to the + * buffer. Ideally we would only mask out the value in the tuple. + */ + memset(page, MASK_MARKER, BLCKSZ); +} + +/* + * Mask a btree page + */ +static void +mask_btree_page(Page page) +{ + OffsetNumber off; + OffsetNumber maxoff; + BTPageOpaque maskopaq = (BTPageOpaque) + (((char *) page) + ((PageHeader) page)->pd_special); + + /* + * Mark unused space before any processing. This is important as it + * uses pd_lower and pd_upper that may be masked on this page + * afterwards if it is a deleted page. + */ + mask_unused_space(page); + + /* + * Mask everything on a DELETED page. + */ + if (((BTPageOpaque) PageGetSpecialPointer(page))->btpo_flags & BTP_DELETED) + { + /* Page content, between standard page header and opaque struct */ + memset(page + SizeOfPageHeaderData, MASK_MARKER, + BLCKSZ - MAXALIGN(sizeof(BTPageOpaqueData))); + + /* pd_lower and upper */ + memset(&((PageHeader) page)->pd_lower, MASK_MARKER, sizeof(uint16)); + memset(&((PageHeader) page)->pd_upper, MASK_MARKER, sizeof(uint16)); + } + else + { + /* + * Mask some line pointer bits, particularly those marked as + * used on a master and unused on a standby. + * XXX: This could be refined. + */ + maxoff = PageGetMaxOffsetNumber(page); + for (off = 1; off <= maxoff; off++) + { + ItemId iid = PageGetItemId(page, off); + + if (ItemIdIsUsed(iid)) + iid->lp_flags = LP_UNUSED; + } + } + + /* + * Mask BTP_HAS_GARBAGE flag. This needs to be done at the end + * of process as previous masking operations could generate some + * garbage. + */ + maskopaq->btpo_flags |= BTP_HAS_GARBAGE; +} + +static void +mask_hash_page(Page page) +{ + OffsetNumber off; + OffsetNumber maxoff; + HashPageOpaque opaque = (HashPageOpaque) PageGetSpecialPointer(page); + + /* + * Mark unused space before any processing. This is important as it + * uses pd_lower and pd_upper that may be masked on this page + * afterwards if it is a deleted page. + */ + mask_unused_space(page); + /* + * Mask everything on a UNUSED page. + */ + if (opaque->hasho_flag & LH_UNUSED_PAGE) + { + /* Page content, between standard page header and opaque struct */ + memset(page + SizeOfPageHeaderData, MASK_MARKER, + BLCKSZ - MAXALIGN(sizeof(HashPageOpaqueData))); + + /* pd_lower and upper */ + memset(&((PageHeader) page)->pd_lower, MASK_MARKER, sizeof(uint16)); + memset(&((PageHeader) page)->pd_upper, MASK_MARKER, sizeof(uint16)); + } + else if ((opaque->hasho_flag & LH_META_PAGE)==0) + { + /* + * For pages other than metapage, + * Mask some line pointer bits, particularly those marked as + * used on a master and unused on a standby. + * XXX: This could be refined. + */ + maxoff = PageGetMaxOffsetNumber(page); + for (off = 1; off <= maxoff; off++) + { + ItemId iid = PageGetItemId(page, off); + + if (ItemIdIsUsed(iid)) + iid->lp_flags = LP_UNUSED; + } + } +} +/* + * mask_page + * + * Mask a given page. First try to find what kind of page it is + * and then normalize it. This function returns a normalized page + * palloc'ed. So caller should free the normalized page correctly when + * using this function. Tracking blkno is needed for gin pages as their + * metapage does not use pd_lower and pd_upper. + * Before calling this function, it is assumed that caller has already + * taken a proper lock on the page being masked. + */ +char * +mask_page(const char *page, BlockNumber blkno) +{ + Page page_norm; + uint16 tail; + + page_norm = (Page) palloc(BLCKSZ); + memcpy(page_norm, page, BLCKSZ); + /* + * Mask the Page LSN. Because, we store the page before updating the LSN. + * Hence, LSNs of both pages will always be different. + */ + mask_page_lsn(page_norm); + /* + * Look at the size of the special area, and the last two bytes in + * it, to detect what kind of a page it is. Then call the appropriate + * masking function. + */ + memcpy(&tail, &page[BLCKSZ - 2], 2); + if (PageGetSpecialSize(page) == 0) + { + /* Case of a normal relation, it has an empty special area */ + mask_heap_page(page_norm); + } + else if (PageGetSpecialSize(page) == MAXALIGN(sizeof(GISTPageOpaqueData)) && + tail == GIST_PAGE_ID) + { + /* Gist page */ + mask_gist_page(page_norm); + } + else if (PageGetSpecialSize(page) == MAXALIGN(sizeof(BTPageOpaqueData)) && + tail <= MAX_BT_CYCLE_ID) + { + /* btree page */ + mask_btree_page(page_norm); + } + else if (PageGetSpecialSize(page) == MAXALIGN(sizeof(SpGistPageOpaqueData)) && + tail == SPGIST_PAGE_ID) + { + /* SpGist page */ + mask_spgist_page(page_norm); + } + else if (PageGetSpecialSize(page) == MAXALIGN(sizeof(GinPageOpaqueData)) || + PageGetSpecialSize(page) == MAXALIGN(sizeof(SequencePageOpaqueData))) + { + /* + * The page found here is used either for a Gin index or a sequence. + * Gin index pages do not have a proper identifier, so check if the page + * is used by a sequence or not. If it is not the case, this page is used + * by a gin index. It is still possible that a gin page covers with area + * with exactly the same value as SEQ_MAGIC, but this is unlikely to happen. + */ + if (((SequencePageOpaqueData *) PageGetSpecialPointer(page))->seq_page_id == SEQ_MAGIC) + mask_sequence_page(page_norm); + else + mask_gin_page(page_norm, blkno); + } + else if(PageGetSpecialSize(page) == MAXALIGN(sizeof(HashPageOpaqueData))) + { + mask_hash_page(page_norm); + } + else + { + /* Should not come here except BRIN pages*/ + Assert(0); + } + + /* Return normalized page */ + return (char *) page_norm; +} diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 6ac5184..645a807 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1800,6 +1800,16 @@ static struct config_int ConfigureNamesInt[] = }, { + {"wal_consistency_mask", PGC_POSTMASTER, WAL_SETTINGS, + gettext_noop("Mask to enable WAL consistency for HEAP_INSERT/HEAP_INSERT2."), + NULL + }, + &wal_consistency_mask, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { {"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY, gettext_noop("Sets the maximum wait time to receive data from the primary."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 6d0666c..e7e21ed 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -191,6 +191,17 @@ # open_sync #full_page_writes = on # recover from partial page writes #wal_compression = off # enable compression of full-page writes +#wal_consistency_mask = 0 # enable WAL consistency check for different operations + # bit 0 - HEAP2 + # bit 1 - HEAP + # bit 2 - BTREE + # bit 3 - HASH + # bit 4 - GIN + # bit 5 - GIST + # bit 6 - SEQ + # bit 7 - SPGIST + # bit 8 - BRIN + # Multiple bits can also be enabled. For example, to enable HEAP and HASH, set the value to 10 #wal_log_hints = off # also do full page writes of non-critical updates # (change requires restart) #wal_buffers = -1 # min 32kB, -1 sets based on shared_buffers diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 14b7f7f..1fc5f6e 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -104,6 +104,7 @@ extern bool EnableHotStandby; extern bool fullPageWrites; extern bool wal_log_hints; extern bool wal_compression; +extern int wal_consistency_mask; extern bool log_checkpoints; extern int CheckPointSegments; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index deaa7f5..287143b 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -52,6 +52,8 @@ typedef struct /* Information on full-page image, if any */ bool has_image; + bool require_image; /* This field contains the true value of has_image. + Because, if wal consistency check is enabled, has_image will always be true.*/ char *bkp_image; uint16 hole_offset; uint16 hole_length; diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index 3dfcb49..34e28c0 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -137,7 +137,7 @@ typedef struct XLogRecordBlockImageHeader /* Information stored in bimg_info */ #define BKPIMAGE_HAS_HOLE 0x01 /* page image has "hole" */ #define BKPIMAGE_IS_COMPRESSED 0x02 /* page image is compressed */ - +#define BKPIMAGE_IS_REQUIRED 0x04 /* page is required by the WAL record */ /* * Extra header information used when page image has "hole" and * is compressed. diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 6af60d8..a7a0e16 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -20,6 +20,19 @@ #include "nodes/parsenodes.h" #include "storage/relfilenode.h" +/* + * Page opaque data in a sequence page + */ +typedef struct SequencePageOpaqueData +{ + uint32 seq_page_id; +} SequencePageOpaqueData; + +/* + * This page ID is for the conveniende to be able to identify if a page + * is being used by a sequence. + */ +#define SEQ_MAGIC 0x1717 typedef struct FormData_pg_sequence { diff --git a/src/include/storage/bufmask.h b/src/include/storage/bufmask.h new file mode 100644 index 0000000..1dd5a67 --- /dev/null +++ b/src/include/storage/bufmask.h @@ -0,0 +1,21 @@ +/*------------------------------------------------------------------------- + * + * bufmask.h + * Buffer masking definitions. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/bufmask.h + */ + +#ifndef BUFMASK_H +#define BUFMASK_H + +#include "postgres.h" +#include "storage/block.h" + +/* Entry point for page masking */ +extern char *mask_page(const char *page, BlockNumber blkno); + +#endif diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 3d5dea7..6621a39 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -44,8 +44,9 @@ typedef enum RBM_ZERO_AND_CLEANUP_LOCK, /* Like RBM_ZERO_AND_LOCK, but locks the page * in "cleanup" mode */ RBM_ZERO_ON_ERROR, /* Read, but return an all-zeros page on error */ - RBM_NORMAL_NO_LOG /* Don't log page as invalid during WAL + RBM_NORMAL_NO_LOG, /* Don't log page as invalid during WAL * replay; otherwise same as RBM_NORMAL */ + RBM_WAL_CHECK /*Normal read, but don't check whether the page is new or not. */ } ReadBufferMode; /* forward declared, to avoid having to expose buf_internals.h here */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers