Hi Hackers,

In order to make incremental backup
(https://wiki.postgresql.org/wiki/Incremental_backup) efficient we
need a way to track the LSN of a page in a way that we can retrieve it
without reading the actual block. Below there is my proposal on how to
achieve it.

LSN Map
-------

The purpose of the LSN map is to quickly know if a page of a relation
has been modified after a specified checkpoint.

Implementation
--------------

We create an additional fork which contains a raw stream of LSNs. To
limit the space used, every entry represent the maximum LSN of a group
of blocks of a fixed size. I chose arbitrarily the size of 2048
which is equivalent to 16MB of heap data, which means that we need 64k
entry to track one terabyte of heap.

Name
----

I've called this map LSN map, and I've named the corresponding fork
file as "lm".

WAL logging
-----------

At the moment the map is not wal logged, but is updated during the wal
reply. I'm not enough deep in WAL mechanics to see if the current
approach is sane or if we should change it.

Current limits
--------------

The current implementation tracks only heap LSN. It currently does not
track any kind of indexes, but this can be easily added later. The
implementation of commands that rewrite the whole table can be
improved: cluster uses shared memory buffers instead of writing the
map directly on the disk, and moving a table to another tablespace
simply drops the map instead of updating it correctly.

Further ideas
-------------

The current implementation updates an entry in the map every time the
block get its LSN bumped, but we really only need to know which is the
first checkpoint that contains expired data. So setting the entry to
the last checkpoint LSN is probably enough, and will reduce the number
of writes. To implement this we only need a backend local copy of the
last checkpoint LSN, which is updated during each XLogInsert. Again,
I'm not enough deep in replication mechanics to see if this approach
could work on a standby using restartpoints instead of checkpoints.
Please advice on the best way to implement it.

Conclusions
------------

This code is incomplete, and the xlog reply part must be
improved/fixed, but I think its a good start to have this feature.
I will appreciate any review, advice or critic.

Regards,
Marco

-- 
Marco Nenciarini - 2ndQuadrant Italy
PostgreSQL Training, Services and Support
marco.nenciar...@2ndquadrant.it | www.2ndQuadrant.it

From 89a943032f0a10fd093c126d15fbf81e5861dbe3 Mon Sep 17 00:00:00 2001
From: Marco Nenciarini <marco.nenciar...@2ndquadrant.it>
Date: Mon, 3 Nov 2014 17:52:27 +0100
Subject: [PATCH] LSN Map

This is a WIP. Only heap is supported. No indexes, no sequences.
---
 src/backend/access/heap/Makefile      |   2 +-
 src/backend/access/heap/heapam.c      | 239 ++++++++++++++++++++++--
 src/backend/access/heap/hio.c         |  11 +-
 src/backend/access/heap/lsnmap.c      | 336 ++++++++++++++++++++++++++++++++++
 src/backend/access/heap/pruneheap.c   |  10 +
 src/backend/access/heap/rewriteheap.c |  37 +++-
 src/backend/catalog/storage.c         |   8 +
 src/backend/commands/tablecmds.c      |   5 +-
 src/backend/commands/vacuumlazy.c     |  35 +++-
 src/backend/storage/smgr/smgr.c       |   1 +
 src/common/relpath.c                  |   5 +-
 src/include/access/hio.h              |   3 +-
 src/include/access/lsnmap.h           |  28 +++
 src/include/common/relpath.h          |   5 +-
 src/include/storage/smgr.h            |   1 +
 15 files changed, 687 insertions(+), 39 deletions(-)
 create mode 100644 src/backend/access/heap/lsnmap.c
 create mode 100644 src/include/access/lsnmap.h

diff --git a/src/backend/access/heap/Makefile b/src/backend/access/heap/Makefile
index b83d496..776ee7d 100644
*** a/src/backend/access/heap/Makefile
--- b/src/backend/access/heap/Makefile
*************** subdir = src/backend/access/heap
*** 12,17 ****
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o 
visibilitymap.o
  
  include $(top_srcdir)/src/backend/common.mk
--- 12,17 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o 
visibilitymap.o lsnmap.o
  
  include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 21e9d06..9486562 100644
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 48,53 ****
--- 48,54 ----
  #include "access/tuptoaster.h"
  #include "access/valid.h"
  #include "access/visibilitymap.h"
+ #include "access/lsnmap.h"
  #include "access/xact.h"
  #include "access/xlog.h"
  #include "access/xloginsert.h"
*************** heap_insert(Relation relation, HeapTuple
*** 2067,2073 ****
        TransactionId xid = GetCurrentTransactionId();
        HeapTuple       heaptup;
        Buffer          buffer;
!       Buffer          vmbuffer = InvalidBuffer;
        bool            all_visible_cleared = false;
  
        /*
--- 2068,2075 ----
        TransactionId xid = GetCurrentTransactionId();
        HeapTuple       heaptup;
        Buffer          buffer;
!       Buffer          vmbuffer = InvalidBuffer,
!                               lmbuffer = InvalidBuffer;
        bool            all_visible_cleared = false;
  
        /*
*************** heap_insert(Relation relation, HeapTuple
*** 2097,2103 ****
         */
        buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
                                                                           
InvalidBuffer, options, bistate,
!                                                                          
&vmbuffer, NULL);
  
        /* NO EREPORT(ERROR) from here till changes are logged */
        START_CRIT_SECTION();
--- 2099,2106 ----
         */
        buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
                                                                           
InvalidBuffer, options, bistate,
!                                                                          
&vmbuffer, NULL,
!                                                                          
&lmbuffer, NULL);
  
        /* NO EREPORT(ERROR) from here till changes are logged */
        START_CRIT_SECTION();
*************** heap_insert(Relation relation, HeapTuple
*** 2192,2197 ****
--- 2195,2205 ----
                recptr = XLogInsert(RM_HEAP_ID, info);
  
                PageSetLSN(page, recptr);
+ 
+               /*
+                * Update the LSN map
+                */
+               lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, 
recptr);
        }
  
        END_CRIT_SECTION();
*************** heap_insert(Relation relation, HeapTuple
*** 2199,2204 ****
--- 2207,2214 ----
        UnlockReleaseBuffer(buffer);
        if (vmbuffer != InvalidBuffer)
                ReleaseBuffer(vmbuffer);
+       if (lmbuffer != InvalidBuffer)
+               ReleaseBuffer(lmbuffer);
  
        /*
         * If tuple is cachable, mark it for invalidation from the caches in 
case
*************** heap_multi_insert(Relation relation, Hea
*** 2346,2352 ****
        while (ndone < ntuples)
        {
                Buffer          buffer;
!               Buffer          vmbuffer = InvalidBuffer;
                bool            all_visible_cleared = false;
                int                     nthispage;
  
--- 2356,2363 ----
        while (ndone < ntuples)
        {
                Buffer          buffer;
!               Buffer          vmbuffer = InvalidBuffer,
!                                       lmbuffer = InvalidBuffer;
                bool            all_visible_cleared = false;
                int                     nthispage;
  
*************** heap_multi_insert(Relation relation, Hea
*** 2358,2364 ****
                 */
                buffer = RelationGetBufferForTuple(relation, 
heaptuples[ndone]->t_len,
                                                                                
   InvalidBuffer, options, bistate,
!                                                                               
   &vmbuffer, NULL);
                page = BufferGetPage(buffer);
  
                /* NO EREPORT(ERROR) from here till changes are logged */
--- 2369,2376 ----
                 */
                buffer = RelationGetBufferForTuple(relation, 
heaptuples[ndone]->t_len,
                                                                                
   InvalidBuffer, options, bistate,
!                                                                               
   &vmbuffer, NULL,
!                                                                               
   &lmbuffer, NULL);
                page = BufferGetPage(buffer);
  
                /* NO EREPORT(ERROR) from here till changes are logged */
*************** heap_multi_insert(Relation relation, Hea
*** 2502,2507 ****
--- 2514,2521 ----
                        recptr = XLogInsert(RM_HEAP2_ID, info);
  
                        PageSetLSN(page, recptr);
+ 
+                       lsnmap_set(relation, BufferGetBlockNumber(buffer), 
lmbuffer, recptr);
                }
  
                END_CRIT_SECTION();
*************** heap_multi_insert(Relation relation, Hea
*** 2509,2514 ****
--- 2523,2530 ----
                UnlockReleaseBuffer(buffer);
                if (vmbuffer != InvalidBuffer)
                        ReleaseBuffer(vmbuffer);
+               if (lmbuffer != InvalidBuffer)
+                       ReleaseBuffer(lmbuffer);
  
                ndone += nthispage;
        }
*************** heap_delete(Relation relation, ItemPoint
*** 2629,2635 ****
        Page            page;
        BlockNumber block;
        Buffer          buffer;
!       Buffer          vmbuffer = InvalidBuffer;
        TransactionId new_xmax;
        uint16          new_infomask,
                                new_infomask2;
--- 2645,2652 ----
        Page            page;
        BlockNumber block;
        Buffer          buffer;
!       Buffer          vmbuffer = InvalidBuffer,
!                               lmbuffer = InvalidBuffer;
        TransactionId new_xmax;
        uint16          new_infomask,
                                new_infomask2;
*************** heap_delete(Relation relation, ItemPoint
*** 2645,2650 ****
--- 2662,2670 ----
        buffer = ReadBuffer(relation, block);
        page = BufferGetPage(buffer);
  
+       if (RelationNeedsWAL(relation))
+               lsnmap_pin(relation, block, &lmbuffer);
+ 
        /*
         * Before locking the buffer, pin the visibility map page if it appears 
to
         * be necessary.  Since we haven't got the lock yet, someone else might 
be
*************** l1:
*** 2797,2802 ****
--- 2817,2824 ----
                        UnlockTupleTuplock(relation, &(tp.t_self), 
LockTupleExclusive);
                if (vmbuffer != InvalidBuffer)
                        ReleaseBuffer(vmbuffer);
+               if (lmbuffer != InvalidBuffer)
+                       ReleaseBuffer(lmbuffer);
                return result;
        }
  
*************** l1:
*** 2912,2917 ****
--- 2934,2941 ----
                recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
  
                PageSetLSN(page, recptr);
+ 
+               lsnmap_set(relation, block, lmbuffer, recptr);
        }
  
        END_CRIT_SECTION();
*************** l1:
*** 2920,2926 ****
  
        if (vmbuffer != InvalidBuffer)
                ReleaseBuffer(vmbuffer);
! 
        /*
         * If the tuple has toasted out-of-line attributes, we need to delete
         * those items too.  We have to do this before releasing the buffer
--- 2944,2951 ----
  
        if (vmbuffer != InvalidBuffer)
                ReleaseBuffer(vmbuffer);
!       if (lmbuffer != InvalidBuffer)
!               ReleaseBuffer(lmbuffer);
        /*
         * If the tuple has toasted out-of-line attributes, we need to delete
         * those items too.  We have to do this before releasing the buffer
*************** heap_update(Relation relation, ItemPoint
*** 3053,3059 ****
        Buffer          buffer,
                                newbuf,
                                vmbuffer = InvalidBuffer,
!                               vmbuffer_new = InvalidBuffer;
        bool            need_toast,
                                already_marked;
        Size            newtupsize,
--- 3078,3086 ----
        Buffer          buffer,
                                newbuf,
                                vmbuffer = InvalidBuffer,
!                               vmbuffer_new = InvalidBuffer,
!                               lmbuffer = InvalidBuffer,
!                               lmbuffer_new = InvalidBuffer;
        bool            need_toast,
                                already_marked;
        Size            newtupsize,
*************** heap_update(Relation relation, ItemPoint
*** 3099,3104 ****
--- 3126,3134 ----
        buffer = ReadBuffer(relation, block);
        page = BufferGetPage(buffer);
  
+       if (RelationNeedsWAL(relation))
+               lsnmap_pin(relation, block, &lmbuffer);
+ 
        /*
         * Before locking the buffer, pin the visibility map page if it appears 
to
         * be necessary.  Since we haven't got the lock yet, someone else might 
be
*************** l2:
*** 3390,3395 ****
--- 3420,3427 ----
                        UnlockTupleTuplock(relation, &(oldtup.t_self), 
*lockmode);
                if (vmbuffer != InvalidBuffer)
                        ReleaseBuffer(vmbuffer);
+               if (lmbuffer != InvalidBuffer)
+                       ReleaseBuffer(lmbuffer);
                bms_free(hot_attrs);
                bms_free(key_attrs);
                return result;
*************** l2:
*** 3570,3576 ****
                        /* Assume there's no chance to put heaptup on same 
page. */
                        newbuf = RelationGetBufferForTuple(relation, 
heaptup->t_len,
                                                                                
           buffer, 0, NULL,
!                                                                               
           &vmbuffer_new, &vmbuffer);
                }
                else
                {
--- 3602,3609 ----
                        /* Assume there's no chance to put heaptup on same 
page. */
                        newbuf = RelationGetBufferForTuple(relation, 
heaptup->t_len,
                                                                                
           buffer, 0, NULL,
!                                                                               
           &vmbuffer_new, &vmbuffer,
!                                                                               
           &lmbuffer_new, &lmbuffer);
                }
                else
                {
*************** l2:
*** 3588,3594 ****
                                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
                                newbuf = RelationGetBufferForTuple(relation, 
heaptup->t_len,
                                                                                
                   buffer, 0, NULL,
!                                                                               
                   &vmbuffer_new, &vmbuffer);
                        }
                        else
                        {
--- 3621,3628 ----
                                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
                                newbuf = RelationGetBufferForTuple(relation, 
heaptup->t_len,
                                                                                
                   buffer, 0, NULL,
!                                                                               
                   &vmbuffer_new, &vmbuffer,
!                                                                               
                   &lmbuffer_new, &lmbuffer);
                        }
                        else
                        {
*************** l2:
*** 3740,3747 ****
--- 3774,3783 ----
                if (newbuf != buffer)
                {
                        PageSetLSN(BufferGetPage(newbuf), recptr);
+                       lsnmap_set(relation, BufferGetBlockNumber(newbuf), 
lmbuffer_new, recptr);
                }
                PageSetLSN(BufferGetPage(buffer), recptr);
+               lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer,  
recptr);
        }
  
        END_CRIT_SECTION();
*************** l2:
*** 3768,3774 ****
                ReleaseBuffer(vmbuffer_new);
        if (BufferIsValid(vmbuffer))
                ReleaseBuffer(vmbuffer);
! 
        /*
         * Release the lmgr tuple lock, if we had it.
         */
--- 3804,3813 ----
                ReleaseBuffer(vmbuffer_new);
        if (BufferIsValid(vmbuffer))
                ReleaseBuffer(vmbuffer);
!       if (BufferIsValid(lmbuffer_new))
!               ReleaseBuffer(lmbuffer_new);
!       if (BufferIsValid(lmbuffer))
!               ReleaseBuffer(lmbuffer);
        /*
         * Release the lmgr tuple lock, if we had it.
         */
*************** heap_lock_tuple(Relation relation, HeapT
*** 4091,4096 ****
--- 4130,4136 ----
        HTSU_Result result;
        ItemPointer tid = &(tuple->t_self);
        ItemId          lp;
+       Buffer          lmbuffer = InvalidBuffer;
        Page            page;
        TransactionId xid,
                                xmax;
*************** failed:
*** 4567,4572 ****
--- 4607,4615 ----
                return HeapTupleMayBeUpdated;
        }
  
+       if (RelationNeedsWAL(relation))
+               lsnmap_pin(relation, BufferGetBlockNumber(*buffer), &lmbuffer);
+ 
        /*
         * If this is the first possibly-multixact-able operation in the current
         * transaction, set my per-backend OldestMemberMXactId setting. We can 
be
*************** failed:
*** 4647,4652 ****
--- 4690,4697 ----
                recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
  
                PageSetLSN(page, recptr);
+ 
+               lsnmap_set(relation, BufferGetBlockNumber(*buffer), lmbuffer, 
recptr);
        }
  
        END_CRIT_SECTION();
*************** failed:
*** 4658,4663 ****
--- 4703,4711 ----
         * visibility info.
         */
  
+       if (lmbuffer != InvalidBuffer)
+               ReleaseBuffer(lmbuffer);
+ 
        /*
         * Now that we have successfully marked the tuple as locked, we can
         * release the lmgr tuple lock, if we had it.
*************** heap_lock_updated_tuple_rec(Relation rel
*** 5100,5106 ****
  {
        ItemPointerData tupid;
        HeapTupleData mytup;
!       Buffer          buf;
        uint16          new_infomask,
                                new_infomask2,
                                old_infomask,
--- 5148,5155 ----
  {
        ItemPointerData tupid;
        HeapTupleData mytup;
!       Buffer          buf,
!                               lmbuffer = InvalidBuffer;
        uint16          new_infomask,
                                new_infomask2,
                                old_infomask,
*************** heap_lock_updated_tuple_rec(Relation rel
*** 5129,5134 ****
--- 5178,5186 ----
                        return HeapTupleMayBeUpdated;
                }
  
+               if (RelationNeedsWAL(rel))
+                       lsnmap_pin(rel, BufferGetBlockNumber(buf), &lmbuffer);
+ 
  l4:
                CHECK_FOR_INTERRUPTS();
                LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
*************** l4:
*** 5142,5147 ****
--- 5194,5201 ----
                                                                 priorXmax))
                {
                        UnlockReleaseBuffer(buf);
+                       if (lmbuffer != InvalidBuffer)
+                               ReleaseBuffer(lmbuffer);
                        return HeapTupleMayBeUpdated;
                }
  
*************** l4:
*** 5189,5194 ****
--- 5243,5250 ----
                                        if (res != HeapTupleMayBeUpdated)
                                        {
                                                UnlockReleaseBuffer(buf);
+                                               if (lmbuffer != InvalidBuffer)
+                                                       ReleaseBuffer(lmbuffer);
                                                pfree(members);
                                                return res;
                                        }
*************** l4:
*** 5249,5254 ****
--- 5305,5312 ----
                                if (res != HeapTupleMayBeUpdated)
                                {
                                        UnlockReleaseBuffer(buf);
+                                       if (lmbuffer != InvalidBuffer)
+                                               ReleaseBuffer(lmbuffer);
                                        return res;
                                }
                        }
*************** l4:
*** 5289,5298 ****
--- 5347,5361 ----
                        recptr = XLogInsert(RM_HEAP2_ID, 
XLOG_HEAP2_LOCK_UPDATED);
  
                        PageSetLSN(page, recptr);
+ 
+                       lsnmap_set(rel, BufferGetBlockNumber(buf), lmbuffer, 
recptr);
                }
  
                END_CRIT_SECTION();
  
+               if (lmbuffer != InvalidBuffer)
+                       ReleaseBuffer(lmbuffer);
+ 
                /* if we find the end of update chain, we're done. */
                if (mytup.t_data->t_infomask & HEAP_XMAX_INVALID ||
                        ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid) 
||
*************** heap_lock_updated_tuple(Relation rel, He
*** 5374,5380 ****
  void
  heap_inplace_update(Relation relation, HeapTuple tuple)
  {
!       Buffer          buffer;
        Page            page;
        OffsetNumber offnum;
        ItemId          lp = NULL;
--- 5437,5444 ----
  void
  heap_inplace_update(Relation relation, HeapTuple tuple)
  {
!       Buffer          buffer,
!                               lmbuffer = InvalidBuffer;
        Page            page;
        OffsetNumber offnum;
        ItemId          lp = NULL;
*************** heap_inplace_update(Relation relation, H
*** 5383,5388 ****
--- 5447,5456 ----
        uint32          newlen;
  
        buffer = ReadBuffer(relation, 
ItemPointerGetBlockNumber(&(tuple->t_self)));
+ 
+       if (RelationNeedsWAL(relation))
+               lsnmap_pin(relation, BufferGetBlockNumber(buffer), &lmbuffer);
+ 
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
        page = (Page) BufferGetPage(buffer);
  
*************** heap_inplace_update(Relation relation, H
*** 5426,5437 ****
--- 5494,5510 ----
                recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
  
                PageSetLSN(page, recptr);
+ 
+               lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, 
recptr);
        }
  
        END_CRIT_SECTION();
  
        UnlockReleaseBuffer(buffer);
  
+       if (lmbuffer != InvalidBuffer)
+               ReleaseBuffer(lmbuffer);
+ 
        /*
         * Send out shared cache inval if necessary.  Note that because we only
         * pass the new version of the tuple, this mustn't be used for any
*************** heap_xlog_clean(XLogReaderState *record)
*** 7024,7029 ****
--- 7097,7115 ----
                ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, 
rnode);
  
        /*
+        * Update the LSN map
+        */
+       {
+               Relation        reln = CreateFakeRelcacheEntry(rnode);
+               Buffer          lmbuffer = InvalidBuffer;
+ 
+               lsnmap_pin(reln, blkno, &lmbuffer);
+               lsnmap_set(reln, blkno, lmbuffer, lsn);
+               ReleaseBuffer(lmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+ 
+       /*
         * If we have a full-page image, restore it (using a cleanup lock) and
         * we're done.
         */
*************** heap_xlog_freeze_page(XLogReaderState *r
*** 7208,7225 ****
        xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) 
XLogRecGetData(record);
        TransactionId cutoff_xid = xlrec->cutoff_xid;
        Buffer          buffer;
        int                     ntup;
  
        /*
         * In Hot Standby mode, ensure that there's no queries running which 
still
         * consider the frozen xids as running.
         */
        if (InHotStandby)
        {
!               RelFileNode rnode;
  
!               XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
!               ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode);
        }
  
        if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
--- 7294,7323 ----
        xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) 
XLogRecGetData(record);
        TransactionId cutoff_xid = xlrec->cutoff_xid;
        Buffer          buffer;
+       RelFileNode rnode;
+       BlockNumber blkno;
        int                     ntup;
  
+       XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+ 
        /*
         * In Hot Standby mode, ensure that there's no queries running which 
still
         * consider the frozen xids as running.
         */
        if (InHotStandby)
+               ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode);
+ 
+       /*
+        * Update the LSN map
+        */
        {
!               Relation        reln = CreateFakeRelcacheEntry(rnode);
!               Buffer          lmbuffer = InvalidBuffer;
  
!               lsnmap_pin(reln, blkno, &lmbuffer);
!               lsnmap_set(reln, blkno, lmbuffer, lsn);
!               ReleaseBuffer(lmbuffer);
!               FreeFakeRelcacheEntry(reln);
        }
  
        if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
*************** heap_xlog_delete(XLogReaderState *record
*** 7309,7314 ****
--- 7407,7425 ----
                FreeFakeRelcacheEntry(reln);
        }
  
+       /*
+        * Update the LSN map
+        */
+       {
+               Relation        reln = CreateFakeRelcacheEntry(target_node);
+               Buffer          lmbuffer = InvalidBuffer;
+ 
+               lsnmap_pin(reln, blkno, &lmbuffer);
+               lsnmap_set(reln, blkno, lmbuffer, lsn);
+               ReleaseBuffer(lmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+ 
        if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
        {
                page = BufferGetPage(buffer);
*************** heap_xlog_insert(XLogReaderState *record
*** 7385,7390 ****
--- 7496,7514 ----
        }
  
        /*
+        * Update the LSN map
+        */
+       {
+               Relation        reln = CreateFakeRelcacheEntry(target_node);
+               Buffer          lmbuffer = InvalidBuffer;
+ 
+               lsnmap_pin(reln, blkno, &lmbuffer);
+               lsnmap_set(reln, blkno, lmbuffer, lsn);
+               ReleaseBuffer(lmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+ 
+       /*
         * If we inserted the first and only tuple on the page, re-initialize 
the
         * page from scratch.
         */
*************** heap_xlog_multi_insert(XLogReaderState *
*** 7504,7509 ****
--- 7628,7646 ----
                FreeFakeRelcacheEntry(reln);
        }
  
+       /*
+        * Update the LSN map
+        */
+       {
+               Relation        reln = CreateFakeRelcacheEntry(rnode);
+               Buffer          lmbuffer = InvalidBuffer;
+ 
+               lsnmap_pin(reln, blkno, &lmbuffer);
+               lsnmap_set(reln, blkno, lmbuffer, lsn);
+               ReleaseBuffer(lmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+ 
        if (isinit)
        {
                buffer = XLogInitBufferForRedo(record, 0);
*************** heap_xlog_update(XLogReaderState *record
*** 7660,7665 ****
--- 7797,7820 ----
        }
  
        /*
+        * Update the LSN map
+        */
+       {
+               Relation        reln = CreateFakeRelcacheEntry(rnode);
+               Buffer          lmbuffer = InvalidBuffer;
+ 
+               lsnmap_pin(reln, oldblk, &lmbuffer);
+               lsnmap_set(reln, oldblk, lmbuffer, lsn);
+               if (oldblk != newblk)
+               {
+                       lsnmap_pin(reln, newblk, &lmbuffer);
+                       lsnmap_set(reln, newblk, lmbuffer, lsn);
+               }
+               ReleaseBuffer(lmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+ 
+       /*
         * In normal operation, it is important to lock the two pages in
         * page-number order, to avoid possible deadlocks against other update
         * operations going the other way.  However, during WAL replay there can
*************** heap_xlog_lock(XLogReaderState *record)
*** 7882,7887 ****
--- 8037,8060 ----
        ItemId          lp = NULL;
        HeapTupleHeader htup;
  
+       /*
+        * Update the LSN map
+        */
+       {
+               Relation        reln;
+               RelFileNode     rnode;
+               BlockNumber     blkno;
+               Buffer          lmbuffer = InvalidBuffer;
+ 
+               XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+               reln = CreateFakeRelcacheEntry(rnode);
+ 
+               lsnmap_pin(reln, blkno, &lmbuffer);
+               lsnmap_set(reln, blkno, lmbuffer, lsn);
+               ReleaseBuffer(lmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+ 
        if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
        {
                page = (Page) BufferGetPage(buffer);
*************** heap_xlog_lock_updated(XLogReaderState *
*** 7930,7935 ****
--- 8103,8126 ----
        ItemId          lp = NULL;
        HeapTupleHeader htup;
  
+       /*
+        * Update the LSN map
+        */
+       {
+               Relation        reln;
+               RelFileNode     rnode;
+               BlockNumber     blkno;
+               Buffer          lmbuffer = InvalidBuffer;
+ 
+               XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+               reln = CreateFakeRelcacheEntry(rnode);
+ 
+               lsnmap_pin(reln, blkno, &lmbuffer);
+               lsnmap_set(reln, blkno, lmbuffer, lsn);
+               ReleaseBuffer(lmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+ 
        xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
  
        if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
*************** heap_xlog_inplace(XLogReaderState *recor
*** 7969,7974 ****
--- 8160,8183 ----
        uint32          oldlen;
        Size            newlen;
  
+       /*
+        * Update the LSN map
+        */
+       {
+               Relation        reln;
+               RelFileNode     rnode;
+               BlockNumber     blkno;
+               Buffer          lmbuffer = InvalidBuffer;
+ 
+               XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+               reln = CreateFakeRelcacheEntry(rnode);
+ 
+               lsnmap_pin(reln, blkno, &lmbuffer);
+               lsnmap_set(reln, blkno, lmbuffer, lsn);
+               ReleaseBuffer(lmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+ 
        if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
        {
                char       *newtup = XLogRecGetBlockData(record, 0, &newlen);
diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index 6d091f6..09e93d0 100644
*** a/src/backend/access/heap/hio.c
--- b/src/backend/access/heap/hio.c
***************
*** 19,24 ****
--- 19,25 ----
  #include "access/hio.h"
  #include "access/htup_details.h"
  #include "access/visibilitymap.h"
+ #include "access/lsnmap.h"
  #include "storage/bufmgr.h"
  #include "storage/freespace.h"
  #include "storage/lmgr.h"
*************** Buffer
*** 215,221 ****
  RelationGetBufferForTuple(Relation relation, Size len,
                                                  Buffer otherBuffer, int 
options,
                                                  BulkInsertState bistate,
!                                                 Buffer *vmbuffer, Buffer 
*vmbuffer_other)
  {
        bool            use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
        Buffer          buffer = InvalidBuffer;
--- 216,223 ----
  RelationGetBufferForTuple(Relation relation, Size len,
                                                  Buffer otherBuffer, int 
options,
                                                  BulkInsertState bistate,
!                                                 Buffer *vmbuffer, Buffer 
*vmbuffer_other,
!                                                 Buffer *lmbuffer, Buffer 
*lmbuffer_other)
  {
        bool            use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
        Buffer          buffer = InvalidBuffer;
*************** RelationGetBufferForTuple(Relation relat
*** 297,302 ****
--- 299,308 ----
  
        while (targetBlock != InvalidBlockNumber)
        {
+ 
+               if (RelationNeedsWAL(relation))
+                       lsnmap_pin(relation, targetBlock, lmbuffer);
+ 
                /*
                 * Read and exclusive-lock the target block, as well as the 
other
                 * block if one was given, taking suitable care with lock 
ordering and
*************** RelationGetBufferForTuple(Relation relat
*** 438,443 ****
--- 444,452 ----
         */
        buffer = ReadBufferBI(relation, P_NEW, bistate);
  
+       if (RelationNeedsWAL(relation))
+               lsnmap_pin(relation, BufferGetBlockNumber(buffer), lmbuffer);
+ 
        /*
         * We can be certain that locking the otherBuffer first is OK, since it
         * must have a lower page number.
diff --git a/src/backend/access/heap/lsnmap.c b/src/backend/access/heap/lsnmap.c
index ...e736ed6 100644
*** a/src/backend/access/heap/lsnmap.c
--- b/src/backend/access/heap/lsnmap.c
***************
*** 0 ****
--- 1,336 ----
+ /*-------------------------------------------------------------------------
+  *
+  * lsnmap.c
+  *      map for tracking LSN of heap blocks
+  *
+  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *      src/backend/access/heap/lsnmap.c
+  *
+  * INTERFACE ROUTINES
+  *            lsnmap_pin       - pin a map page for setting a bit
+  *            lsnmap_set       - set a bit in a previously pinned page
+  *            lsnmap_truncate - truncate the LSN map
+  *
+  * NOTES
+  *
+  * The LSN map contains an LSN per HEAPBLOCKS_PER_ENTRY heap pages. Every 
entry
+  * means that no modification have been made to the pages that are part of 
group
+  * after that LSN.
+  *
+  * The LSN map is not wal logged, but is updated during log reply.
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/heapam_xlog.h"
+ #include "access/lsnmap.h"
+ #include "miscadmin.h"
+ #include "storage/bufmgr.h"
+ #include "storage/lmgr.h"
+ #include "storage/smgr.h"
+ #include "utils/inval.h"
+ 
+ 
+ /*#define TRACE_LSNMAP */
+ 
+ /* Number of pages per LSN map entry/ */
+ #define HEAPBLOCKS_PER_ENTRY 2048
+ 
+ /* Size of an LSN map entry */
+ #define BYTES_PER_ENTRY (sizeof(XLogRecPtr))
+ 
+ /*
+  * Size of the map on each LSN map page, in bytes. There's no
+  * extra headers, so the whole page minus the standard page header is
+  * used for the bitmap.
+  */
+ #define MAPSIZE TYPEALIGN_DOWN(BYTES_PER_ENTRY, BLCKSZ - 
MAXALIGN(SizeOfPageHeaderData))
+ 
+ /* Number of heap blocks we can represent in one LSN map page. */
+ #define HEAPBLOCKS_PER_PAGE (MAPSIZE / BYTES_PER_ENTRY * HEAPBLOCKS_PER_ENTRY)
+ 
+ /* Mapping from heap block number to the right bit in the LSN map */
+ #define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
+ #define HEAPBLK_TO_MAPPOS(x) (((x) % HEAPBLOCKS_PER_PAGE) / 
HEAPBLOCKS_PER_ENTRY)
+ 
+ /* prototypes for internal routines */
+ static Buffer lm_readbuf(Relation rel, BlockNumber blkno, bool extend);
+ static void lm_extend(Relation rel, BlockNumber nlmblocks);
+ 
+ /*
+  *    lsnmap_pin - pin a map page for setting an entry
+  *
+  * Setting an entry in the LSN map is a two-phase operation. First, call
+  * lsnmap_pin, to pin the LSN map page containing the bit for
+  * the heap page. Because that can require I/O to read the map page, you
+  * shouldn't hold a lock on the heap page while doing that. Then, call
+  * lsnmap_set to actually set the bit.
+  *
+  * On entry, *buf should be InvalidBuffer or a valid buffer returned by
+  * an earlier call to lsnmap_pin on the same relation.
+  * On return, *buf is a valid buffer with the map page containing
+  * the entry for heapBlk.
+  *
+  * If the page doesn't exist in the map file yet, it is extended.
+  */
+ void
+ lsnmap_pin(Relation rel, BlockNumber heapBlk, Buffer *buf)
+ {
+       BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 
+       /* Reuse the old pinned buffer if possible */
+       if (BufferIsValid(*buf))
+       {
+               if (BufferGetBlockNumber(*buf) == mapBlock)
+                       return;
+ 
+               ReleaseBuffer(*buf);
+       }
+       *buf = lm_readbuf(rel, mapBlock, true);
+ }
+ 
+ /*
+  *    lsnmap_set - set an entry on a previously pinned page
+  *
+  * You must pass a buffer containing the correct map page to this function.
+  * Call lsnmap_pin first to pin the right one. This function doesn't do
+  * any I/O.
+  */
+ void
+ lsnmap_set(Relation rel, BlockNumber heapBlk, Buffer lmBuf, XLogRecPtr lsn)
+ {
+       BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+       uint32          mapPos = HEAPBLK_TO_MAPPOS(heapBlk);
+       XLogRecPtr      *map;
+ 
+ #ifdef TRACE_LSNMAP
+       elog(DEBUG1, "lm_set %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+       /* Check that we have the right LM page pinned */
+       if (!BufferIsValid(lmBuf) || BufferGetBlockNumber(lmBuf) != mapBlock)
+               elog(ERROR, "wrong LM buffer passed to lsnmap_set");
+ 
+       LockBuffer(lmBuf, BUFFER_LOCK_EXCLUSIVE);
+ 
+       map = (XLogRecPtr *) PageGetContents(BufferGetPage(lmBuf));
+ 
+       if (map[mapPos] < lsn)
+       {
+               map[mapPos] = lsn;
+               MarkBufferDirty(lmBuf);
+       }
+ 
+       LockBuffer(lmBuf, BUFFER_LOCK_UNLOCK);
+ }
+ 
+ /*
+  *    lsnmap_truncate - truncate the LSN map
+  *
+  * The caller must hold AccessExclusiveLock on the relation, to ensure that
+  * other backends receive the smgr invalidation event that this function sends
+  * before they access the LM again.
+  *
+  * nheapblocks is the new size of the heap.
+  */
+ void
+ lsnmap_truncate(Relation rel, BlockNumber nheapblocks)
+ {
+       BlockNumber newnblocks;
+ 
+       /* last remaining block, byte, and bit */
+       BlockNumber truncBlock = HEAPBLK_TO_MAPBLOCK(nheapblocks);
+       uint32          truncPos = HEAPBLK_TO_MAPPOS(nheapblocks);
+ 
+ #ifdef TRACE_LSNMAP
+       elog(DEBUG1, "lm_truncate %s %d", RelationGetRelationName(rel), 
nheapblocks);
+ #endif
+ 
+       RelationOpenSmgr(rel);
+ 
+       /*
+        * If no LSN map has been created yet for this relation, there's
+        * nothing to truncate.
+        */
+       if (!smgrexists(rel->rd_smgr, LSNMAP_FORKNUM))
+               return;
+ 
+       /*
+        * Unless the new size is exactly at a LSN map page boundary, the
+        * tail bits in the last remaining map page, representing truncated heap
+        * blocks, need to be cleared. This is not only tidy, but also necessary
+        * because we don't get a chance to clear the bits if the heap is 
extended
+        * again.
+        */
+       if (truncPos != 0)
+       {
+               Buffer          mapBuffer;
+               Page            page;
+               XLogRecPtr      *map;
+ 
+               newnblocks = truncBlock + 1;
+ 
+               mapBuffer = lm_readbuf(rel, truncBlock, false);
+               if (!BufferIsValid(mapBuffer))
+               {
+                       /* nothing to do, the file was already smaller */
+                       return;
+               }
+ 
+               page = BufferGetPage(mapBuffer);
+               map = (XLogRecPtr *) PageGetContents(page);
+ 
+               LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+ 
+               /* Clear out the unwanted bytes. */
+               MemSet(&map[truncPos + 1], 0, MAPSIZE - (truncPos * 
BYTES_PER_ENTRY + 1));
+ 
+               MarkBufferDirty(mapBuffer);
+               UnlockReleaseBuffer(mapBuffer);
+       }
+       else
+               newnblocks = truncBlock;
+ 
+       if (smgrnblocks(rel->rd_smgr, LSNMAP_FORKNUM) <= newnblocks)
+       {
+               /* nothing to do, the file was already smaller than requested 
size */
+               return;
+       }
+ 
+       /* Truncate the unused LM pages, and send smgr inval message */
+       smgrtruncate(rel->rd_smgr, LSNMAP_FORKNUM, newnblocks);
+ 
+       /*
+        * We might as well update the local smgr_lm_nblocks setting. 
smgrtruncate
+        * sent an smgr cache inval message, which will cause other backends to
+        * invalidate their copy of smgr_lm_nblocks, and this one too at the 
next
+        * command boundary.  But this ensures it isn't outright wrong until 
then.
+        */
+       if (rel->rd_smgr)
+               rel->rd_smgr->smgr_lm_nblocks = newnblocks;
+ }
+ 
+ /*
+  * Read a LSN map page.
+  *
+  * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is
+  * true, the LSN map file is extended.
+  */
+ static Buffer
+ lm_readbuf(Relation rel, BlockNumber blkno, bool extend)
+ {
+       Buffer          buf;
+ 
+       /*
+        * We might not have opened the relation at the smgr level yet, or we
+        * might have been forced to close it by a sinval message.  The code 
below
+        * won't necessarily notice relation extension immediately when extend =
+        * false, so we rely on sinval messages to ensure that our ideas about 
the
+        * size of the map aren't too far out of date.
+        */
+       RelationOpenSmgr(rel);
+ 
+       /*
+        * If we haven't cached the size of the LSN map fork yet, check it
+        * first.
+        */
+       if (rel->rd_smgr->smgr_lm_nblocks == InvalidBlockNumber)
+       {
+               if (smgrexists(rel->rd_smgr, LSNMAP_FORKNUM))
+                       rel->rd_smgr->smgr_lm_nblocks = 
smgrnblocks(rel->rd_smgr,
+                                                                               
                          LSNMAP_FORKNUM);
+               else
+                       rel->rd_smgr->smgr_lm_nblocks = 0;
+       }
+ 
+       /* Handle requests beyond EOF */
+       if (blkno >= rel->rd_smgr->smgr_lm_nblocks)
+       {
+               if (extend)
+                       lm_extend(rel, blkno + 1);
+               else
+                       return InvalidBuffer;
+       }
+ 
+       /*
+        * Use ZERO_ON_ERROR mode, and initialize the page if necessary. It's
+        * always safe to clear bits, so it's better to clear corrupt pages than
+        * error out.
+        */
+       buf = ReadBufferExtended(rel, LSNMAP_FORKNUM, blkno,
+                                                        RBM_ZERO_ON_ERROR, 
NULL);
+       if (PageIsNew(BufferGetPage(buf)))
+               PageInit(BufferGetPage(buf), BLCKSZ, 0);
+       return buf;
+ }
+ 
+ /*
+  * Ensure that the LSN map fork is at least lm_nblocks long, extending
+  * it if necessary with zeroed pages.
+  */
+ static void
+ lm_extend(Relation rel, BlockNumber lm_nblocks)
+ {
+       BlockNumber lm_nblocks_now;
+       Page            pg;
+ 
+       pg = (Page) palloc(BLCKSZ);
+       PageInit(pg, BLCKSZ, 0);
+ 
+       /*
+        * We use the relation extension lock to lock out other backends trying 
to
+        * extend the LSN map at the same time. It also locks out extension
+        * of the main fork, unnecessarily, but extending the LSN map
+        * happens seldom enough that it doesn't seem worthwhile to have a
+        * separate lock tag type for it.
+        *
+        * Note that another backend might have extended or created the relation
+        * by the time we get the lock.
+        */
+       LockRelationForExtension(rel, ExclusiveLock);
+ 
+       /* Might have to re-open if a cache flush happened */
+       RelationOpenSmgr(rel);
+ 
+       /*
+        * Create the file first if it doesn't exist.  If smgr_lm_nblocks is
+        * positive then it must exist, no need for an smgrexists call.
+        */
+       if ((rel->rd_smgr->smgr_lm_nblocks == 0 ||
+                rel->rd_smgr->smgr_lm_nblocks == InvalidBlockNumber) &&
+               !smgrexists(rel->rd_smgr, LSNMAP_FORKNUM))
+               smgrcreate(rel->rd_smgr, LSNMAP_FORKNUM, false);
+ 
+       lm_nblocks_now = smgrnblocks(rel->rd_smgr, LSNMAP_FORKNUM);
+ 
+       /* Now extend the file */
+       while (lm_nblocks_now < lm_nblocks)
+       {
+               PageSetChecksumInplace(pg, lm_nblocks_now);
+ 
+               smgrextend(rel->rd_smgr, LSNMAP_FORKNUM, lm_nblocks_now,
+                                  (char *) pg, false);
+               lm_nblocks_now++;
+       }
+ 
+       /*
+        * Send a shared-inval message to force other backends to close any smgr
+        * references they may have for this rel, which we are about to change.
+        * This is a useful optimization because it means that backends don't 
have
+        * to keep checking for creation or extension of the file, which happens
+        * infrequently.
+        */
+       CacheInvalidateSmgr(rel->rd_smgr->smgr_rnode);
+ 
+       /* Update local cache with the up-to-date size */
+       rel->rd_smgr->smgr_lm_nblocks = lm_nblocks_now;
+ 
+       UnlockRelationForExtension(rel, ExclusiveLock);
+ 
+       pfree(pg);
+ }
diff --git a/src/backend/access/heap/pruneheap.c 
b/src/backend/access/heap/pruneheap.c
index 563e5c3..4586ef3 100644
*** a/src/backend/access/heap/pruneheap.c
--- b/src/backend/access/heap/pruneheap.c
***************
*** 18,23 ****
--- 18,24 ----
  #include "access/heapam_xlog.h"
  #include "access/transam.h"
  #include "access/htup_details.h"
+ #include "access/lsnmap.h"
  #include "access/xlog.h"
  #include "catalog/catalog.h"
  #include "miscadmin.h"
*************** heap_page_prune(Relation relation, Buffe
*** 175,184 ****
--- 176,189 ----
  {
        int                     ndeleted = 0;
        Page            page = BufferGetPage(buffer);
+       Buffer          lmbuffer = InvalidBuffer;
        OffsetNumber offnum,
                                maxoff;
        PruneState      prstate;
  
+       if (RelationNeedsWAL(relation))
+               lsnmap_pin(relation, BufferGetBlockNumber(buffer), &lmbuffer);
+ 
        /*
         * Our strategy is to scan the page and make lists of items to change,
         * then apply the changes within a critical section.  This keeps as much
*************** heap_page_prune(Relation relation, Buffe
*** 262,267 ****
--- 267,274 ----
                                                                        
prstate.latestRemovedXid);
  
                        PageSetLSN(BufferGetPage(buffer), recptr);
+ 
+                       lsnmap_set(relation, BufferGetBlockNumber(buffer), 
lmbuffer, recptr);
                }
        }
        else
*************** heap_page_prune(Relation relation, Buffe
*** 286,291 ****
--- 293,301 ----
  
        END_CRIT_SECTION();
  
+       if (lmbuffer != InvalidBuffer)
+               ReleaseBuffer(lmbuffer);
+ 
        /*
         * If requested, report the number of tuples reclaimed to pgstats. This 
is
         * ndeleted minus ndead, because we don't want to count a now-DEAD root
diff --git a/src/backend/access/heap/rewriteheap.c 
b/src/backend/access/heap/rewriteheap.c
index af5c158..0e10567 100644
*** a/src/backend/access/heap/rewriteheap.c
--- b/src/backend/access/heap/rewriteheap.c
***************
*** 109,114 ****
--- 109,115 ----
  
  #include "access/heapam.h"
  #include "access/heapam_xlog.h"
+ #include "access/lsnmap.h"
  #include "access/rewriteheap.h"
  #include "access/transam.h"
  #include "access/tuptoaster.h"
*************** typedef struct RewriteStateData
*** 143,148 ****
--- 144,150 ----
        Page            rs_buffer;              /* page currently being built */
        BlockNumber rs_blockno;         /* block where page will go */
        bool            rs_buffer_valid;        /* T if any tuples in buffer */
+       Buffer          rs_lmbuffer;    /* LSN map buffer */
        bool            rs_use_wal;             /* must we WAL-log inserts? */
        bool            rs_logical_rewrite;             /* do we need to do 
logical rewriting */
        TransactionId rs_oldest_xmin;           /* oldest xmin used by caller to
*************** begin_heap_rewrite(Relation old_heap, Re
*** 272,277 ****
--- 274,280 ----
        /* new_heap needn't be empty, just locked */
        state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
        state->rs_buffer_valid = false;
+       state->rs_lmbuffer = InvalidBuffer;
        state->rs_use_wal = use_wal;
        state->rs_oldest_xmin = oldest_xmin;
        state->rs_freeze_xid = freeze_xid;
*************** end_heap_rewrite(RewriteState state)
*** 332,342 ****
        if (state->rs_buffer_valid)
        {
                if (state->rs_use_wal)
!                       log_newpage(&state->rs_new_rel->rd_node,
!                                               MAIN_FORKNUM,
!                                               state->rs_blockno,
!                                               state->rs_buffer,
!                                               true);
                RelationOpenSmgr(state->rs_new_rel);
  
                PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
--- 335,350 ----
        if (state->rs_buffer_valid)
        {
                if (state->rs_use_wal)
!               {
!                       XLogRecPtr      recptr;
!                       lsnmap_pin(state->rs_new_rel, state->rs_blockno, 
&state->rs_lmbuffer);
!                       recptr = log_newpage(&state->rs_new_rel->rd_node,
!                                                                MAIN_FORKNUM,
!                                                                
state->rs_blockno,
!                                                                
state->rs_buffer,
!                                                                true);
!                       lsnmap_set(state->rs_new_rel, state->rs_blockno, 
state->rs_lmbuffer, recptr);
!               }
                RelationOpenSmgr(state->rs_new_rel);
  
                PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
*************** end_heap_rewrite(RewriteState state)
*** 361,366 ****
--- 369,378 ----
  
        logical_end_heap_rewrite(state);
  
+       /* release the LSN map buffer*/
+       if (state->rs_lmbuffer != InvalidBuffer)
+               ReleaseBuffer(state->rs_lmbuffer);
+ 
        /* Deleting the context frees everything */
        MemoryContextDelete(state->rs_cxt);
  }
*************** raw_heap_insert(RewriteState state, Heap
*** 681,691 ****
  
                        /* XLOG stuff */
                        if (state->rs_use_wal)
!                               log_newpage(&state->rs_new_rel->rd_node,
!                                                       MAIN_FORKNUM,
!                                                       state->rs_blockno,
!                                                       page,
!                                                       true);
  
                        /*
                         * Now write the page. We say isTemp = true even if 
it's not a
--- 693,708 ----
  
                        /* XLOG stuff */
                        if (state->rs_use_wal)
!                       {
!                               XLogRecPtr      recptr;
!                               lsnmap_pin(state->rs_new_rel, 
state->rs_blockno, &state->rs_lmbuffer);
!                               recptr = 
log_newpage(&state->rs_new_rel->rd_node,
!                                                                        
MAIN_FORKNUM,
!                                                                        
state->rs_blockno,
!                                                                        page,
!                                                                        true);
!                               lsnmap_set(state->rs_new_rel, 
state->rs_blockno, state->rs_lmbuffer, recptr);
!                       }
  
                        /*
                         * Now write the page. We say isTemp = true even if 
it's not a
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index ce398fc..979b649 100644
*** a/src/backend/catalog/storage.c
--- b/src/backend/catalog/storage.c
***************
*** 20,25 ****
--- 20,26 ----
  #include "postgres.h"
  
  #include "access/visibilitymap.h"
+ #include "access/lsnmap.h"
  #include "access/xact.h"
  #include "access/xlog.h"
  #include "access/xloginsert.h"
*************** RelationTruncate(Relation rel, BlockNumb
*** 228,233 ****
--- 229,235 ----
  {
        bool            fsm;
        bool            vm;
+       bool            lm;
  
        /* Open it at the smgr level if not already done */
        RelationOpenSmgr(rel);
*************** RelationTruncate(Relation rel, BlockNumb
*** 238,243 ****
--- 240,246 ----
        rel->rd_smgr->smgr_targblock = InvalidBlockNumber;
        rel->rd_smgr->smgr_fsm_nblocks = InvalidBlockNumber;
        rel->rd_smgr->smgr_vm_nblocks = InvalidBlockNumber;
+       rel->rd_smgr->smgr_lm_nblocks = InvalidBlockNumber;
  
        /* Truncate the FSM first if it exists */
        fsm = smgrexists(rel->rd_smgr, FSM_FORKNUM);
*************** RelationTruncate(Relation rel, BlockNumb
*** 249,254 ****
--- 252,262 ----
        if (vm)
                visibilitymap_truncate(rel, nblocks);
  
+       /* Truncate the LSN map too if it exists. */
+       lm = smgrexists(rel->rd_smgr, LSNMAP_FORKNUM);
+       if (lm)
+               lsnmap_truncate(rel, nblocks);
+ 
        /*
         * We WAL-log the truncation before actually truncating, which means
         * trouble if the truncation fails. If we then crash, the WAL replay
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 66d5083..e805324 100644
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
*************** ATExecSetTableSpace(Oid tableOid, Oid ne
*** 9299,9305 ****
        /* copy those extra forks that exist */
        for (forkNum = MAIN_FORKNUM + 1; forkNum <= MAX_FORKNUM; forkNum++)
        {
!               if (smgrexists(rel->rd_smgr, forkNum))
                {
                        smgrcreate(dstrel, forkNum, false);
                        copy_relation_data(rel->rd_smgr, dstrel, forkNum,
--- 9299,9306 ----
        /* copy those extra forks that exist */
        for (forkNum = MAIN_FORKNUM + 1; forkNum <= MAX_FORKNUM; forkNum++)
        {
!               /* LSN map need to be skipped as it contains invalid data */
!               if (forkNum != LSNMAP_FORKNUM && smgrexists(rel->rd_smgr, 
forkNum))
                {
                        smgrcreate(dstrel, forkNum, false);
                        copy_relation_data(rel->rd_smgr, dstrel, forkNum,
*************** ATExecSetTableSpace(Oid tableOid, Oid ne
*** 9307,9312 ****
--- 9308,9315 ----
                }
        }
  
+       /* TODO: build a correct LSN map here */
+ 
        /* drop old relation, and close new one */
        RelationDropStorage(rel);
        smgrclose(dstrel);
diff --git a/src/backend/commands/vacuumlazy.c 
b/src/backend/commands/vacuumlazy.c
index e653bbd..b0d24d7 100644
*** a/src/backend/commands/vacuumlazy.c
--- b/src/backend/commands/vacuumlazy.c
***************
*** 41,46 ****
--- 41,47 ----
  #include "access/heapam.h"
  #include "access/heapam_xlog.h"
  #include "access/htup_details.h"
+ #include "access/lsnmap.h"
  #include "access/multixact.h"
  #include "access/transam.h"
  #include "access/visibilitymap.h"
*************** static void lazy_cleanup_index(Relation 
*** 146,152 ****
                                   IndexBulkDeleteResult *stats,
                                   LVRelStats *vacrelstats);
  static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
!                                int tupindex, LVRelStats *vacrelstats, Buffer 
*vmbuffer);
  static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
  static BlockNumber count_nondeletable_pages(Relation onerel,
                                                 LVRelStats *vacrelstats);
--- 147,153 ----
                                   IndexBulkDeleteResult *stats,
                                   LVRelStats *vacrelstats);
  static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
!                                int tupindex, LVRelStats *vacrelstats, Buffer 
*vmbuffer, Buffer *lmbuffer);
  static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
  static BlockNumber count_nondeletable_pages(Relation onerel,
                                                 LVRelStats *vacrelstats);
*************** lazy_scan_heap(Relation onerel, LVRelSta
*** 456,462 ****
        IndexBulkDeleteResult **indstats;
        int                     i;
        PGRUsage        ru0;
!       Buffer          vmbuffer = InvalidBuffer;
        BlockNumber next_not_all_visible_block;
        bool            skipping_all_visible_blocks;
        xl_heap_freeze_tuple *frozen;
--- 457,464 ----
        IndexBulkDeleteResult **indstats;
        int                     i;
        PGRUsage        ru0;
!       Buffer          vmbuffer = InvalidBuffer,
!                               lmbuffer = InvalidBuffer;
        BlockNumber next_not_all_visible_block;
        bool            skipping_all_visible_blocks;
        xl_heap_freeze_tuple *frozen;
*************** lazy_scan_heap(Relation onerel, LVRelSta
*** 618,623 ****
--- 620,628 ----
                        vacrelstats->num_index_scans++;
                }
  
+               if (RelationNeedsWAL(onerel))
+                       lsnmap_pin(onerel, blkno, &lmbuffer);
+ 
                /*
                 * Pin the visibility map page in case we need to mark the page
                 * all-visible.  In most cases this will be very cheap, because 
we'll
*************** lazy_scan_heap(Relation onerel, LVRelSta
*** 966,971 ****
--- 971,978 ----
                                recptr = log_heap_freeze(onerel, buf, 
FreezeLimit,
                                                                                
 frozen, nfrozen);
                                PageSetLSN(page, recptr);
+ 
+                               lsnmap_set(onerel, BufferGetBlockNumber(buf), 
lmbuffer, recptr);
                        }
  
                        END_CRIT_SECTION();
*************** lazy_scan_heap(Relation onerel, LVRelSta
*** 979,985 ****
                        vacrelstats->num_dead_tuples > 0)
                {
                        /* Remove tuples from heap */
!                       lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, 
&vmbuffer);
                        has_dead_tuples = false;
  
                        /*
--- 986,992 ----
                        vacrelstats->num_dead_tuples > 0)
                {
                        /* Remove tuples from heap */
!                       lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, 
&vmbuffer, &lmbuffer);
                        has_dead_tuples = false;
  
                        /*
*************** lazy_scan_heap(Relation onerel, LVRelSta
*** 1090,1095 ****
--- 1097,1107 ----
                ReleaseBuffer(vmbuffer);
                vmbuffer = InvalidBuffer;
        }
+       if (BufferIsValid(lmbuffer))
+       {
+               ReleaseBuffer(lmbuffer);
+               lmbuffer = InvalidBuffer;
+       }
  
        /* If any tuples need to be deleted, perform final vacuum cycle */
        /* XXX put a threshold on min number of tuples here? */
*************** lazy_vacuum_heap(Relation onerel, LVRelS
*** 1170,1176 ****
        int                     tupindex;
        int                     npages;
        PGRUsage        ru0;
!       Buffer          vmbuffer = InvalidBuffer;
  
        pg_rusage_init(&ru0);
        npages = 0;
--- 1182,1189 ----
        int                     tupindex;
        int                     npages;
        PGRUsage        ru0;
!       Buffer          vmbuffer = InvalidBuffer,
!                               lmbuffer = InvalidBuffer;
  
        pg_rusage_init(&ru0);
        npages = 0;
*************** lazy_vacuum_heap(Relation onerel, LVRelS
*** 1195,1201 ****
                        continue;
                }
                tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, 
vacrelstats,
!                                                                       
&vmbuffer);
  
                /* Now that we've compacted the page, record its available 
space */
                page = BufferGetPage(buf);
--- 1208,1214 ----
                        continue;
                }
                tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, 
vacrelstats,
!                                                                       
&vmbuffer, &lmbuffer);
  
                /* Now that we've compacted the page, record its available 
space */
                page = BufferGetPage(buf);
*************** lazy_vacuum_heap(Relation onerel, LVRelS
*** 1211,1216 ****
--- 1224,1234 ----
                ReleaseBuffer(vmbuffer);
                vmbuffer = InvalidBuffer;
        }
+       if (BufferIsValid(lmbuffer))
+       {
+               ReleaseBuffer(lmbuffer);
+               lmbuffer = InvalidBuffer;
+       }
  
        ereport(elevel,
                        (errmsg("\"%s\": removed %d row versions in %d pages",
*************** lazy_vacuum_heap(Relation onerel, LVRelS
*** 1232,1244 ****
   */
  static int
  lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
!                                int tupindex, LVRelStats *vacrelstats, Buffer 
*vmbuffer)
  {
        Page            page = BufferGetPage(buffer);
        OffsetNumber unused[MaxOffsetNumber];
        int                     uncnt = 0;
        TransactionId visibility_cutoff_xid;
  
        START_CRIT_SECTION();
  
        for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
--- 1250,1265 ----
   */
  static int
  lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
!                                int tupindex, LVRelStats *vacrelstats, Buffer 
*vmbuffer, Buffer *lmbuffer)
  {
        Page            page = BufferGetPage(buffer);
        OffsetNumber unused[MaxOffsetNumber];
        int                     uncnt = 0;
        TransactionId visibility_cutoff_xid;
  
+       if (RelationNeedsWAL(onerel))
+               lsnmap_pin(onerel, blkno, lmbuffer);
+ 
        START_CRIT_SECTION();
  
        for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
*************** lazy_vacuum_page(Relation onerel, BlockN
*** 1273,1278 ****
--- 1294,1301 ----
                                                                unused, uncnt,
                                                                
vacrelstats->latestRemovedXid);
                PageSetLSN(page, recptr);
+ 
+               lsnmap_set(onerel, BufferGetBlockNumber(buffer), *lmbuffer, 
recptr);
        }
  
        /*
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 244b4ea..882dcbe 100644
*** a/src/backend/storage/smgr/smgr.c
--- b/src/backend/storage/smgr/smgr.c
*************** smgropen(RelFileNode rnode, BackendId ba
*** 168,173 ****
--- 168,174 ----
                reln->smgr_targblock = InvalidBlockNumber;
                reln->smgr_fsm_nblocks = InvalidBlockNumber;
                reln->smgr_vm_nblocks = InvalidBlockNumber;
+               reln->smgr_lm_nblocks = InvalidBlockNumber;
                reln->smgr_which = 0;   /* we only have md.c at present */
  
                /* mark it not open */
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 66dfef1..8d52be7 100644
*** a/src/common/relpath.c
--- b/src/common/relpath.c
*************** const char *const forkNames[] = {
*** 35,41 ****
        "main",                                         /* MAIN_FORKNUM */
        "fsm",                                          /* FSM_FORKNUM */
        "vm",                                           /* 
VISIBILITYMAP_FORKNUM */
!       "init"                                          /* INIT_FORKNUM */
  };
  
  /*
--- 35,42 ----
        "main",                                         /* MAIN_FORKNUM */
        "fsm",                                          /* FSM_FORKNUM */
        "vm",                                           /* 
VISIBILITYMAP_FORKNUM */
!       "init",                                         /* INIT_FORKNUM */
!       "lm"                                            /* LSNMAP_FORKNUM*/
  };
  
  /*
*************** forkname_to_number(const char *forkName)
*** 58,64 ****
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("invalid fork name"),
                         errhint("Valid fork names are \"main\", \"fsm\", "
!                                        "\"vm\", and \"init\".")));
  #endif
  
        return InvalidForkNumber;
--- 59,65 ----
                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                         errmsg("invalid fork name"),
                         errhint("Valid fork names are \"main\", \"fsm\", "
!                                        "\"vm\", \"init\" and \"lm\".")));
  #endif
  
        return InvalidForkNumber;
diff --git a/src/include/access/hio.h b/src/include/access/hio.h
index b014029..1ac5762 100644
*** a/src/include/access/hio.h
--- b/src/include/access/hio.h
*************** extern void RelationPutHeapTuple(Relatio
*** 40,45 ****
  extern Buffer RelationGetBufferForTuple(Relation relation, Size len,
                                                  Buffer otherBuffer, int 
options,
                                                  BulkInsertState bistate,
!                                                 Buffer *vmbuffer, Buffer 
*vmbuffer_other);
  
  #endif   /* HIO_H */
--- 40,46 ----
  extern Buffer RelationGetBufferForTuple(Relation relation, Size len,
                                                  Buffer otherBuffer, int 
options,
                                                  BulkInsertState bistate,
!                                                 Buffer *vmbuffer, Buffer 
*vmbuffer_other,
!                                                 Buffer *lmbuffer, Buffer 
*lmbuffer_other);
  
  #endif   /* HIO_H */
diff --git a/src/include/access/lsnmap.h b/src/include/access/lsnmap.h
index ...e61bbc3 100644
*** a/src/include/access/lsnmap.h
--- b/src/include/access/lsnmap.h
***************
*** 0 ****
--- 1,28 ----
+ /*-------------------------------------------------------------------------
+  *
+  * lsnmap.h
+  *            lsn map interface
+  *
+  *
+  * Portions Copyright (c) 2007-2014, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * src/include/access/lsnmap.h
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef LSNMAP_H
+ #define LSNMAP_H
+ 
+ #include "access/xlogdefs.h"
+ #include "storage/block.h"
+ #include "storage/buf.h"
+ #include "utils/relcache.h"
+ 
+ extern void lsnmap_pin(Relation rel, BlockNumber heapBlk,
+                                 Buffer *lmbuf);
+ extern void lsnmap_set(Relation rel, BlockNumber heapBlk, Buffer lmBuf,
+                                 XLogRecPtr lsn);
+ extern void lsnmap_truncate(Relation rel, BlockNumber nheapblocks);
+ 
+ #endif   /* LSNMAP_H */
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index a263779..0c90191 100644
*** a/src/include/common/relpath.h
--- b/src/include/common/relpath.h
*************** typedef enum ForkNumber
*** 27,33 ****
        MAIN_FORKNUM = 0,
        FSM_FORKNUM,
        VISIBILITYMAP_FORKNUM,
!       INIT_FORKNUM
  
        /*
         * NOTE: if you add a new fork, change MAX_FORKNUM and possibly
--- 27,34 ----
        MAIN_FORKNUM = 0,
        FSM_FORKNUM,
        VISIBILITYMAP_FORKNUM,
!       INIT_FORKNUM,
!       LSNMAP_FORKNUM
  
        /*
         * NOTE: if you add a new fork, change MAX_FORKNUM and possibly
*************** typedef enum ForkNumber
*** 36,42 ****
         */
  } ForkNumber;
  
! #define MAX_FORKNUM           INIT_FORKNUM
  
  #define FORKNAMECHARS 4               /* max chars for a fork name */
  
--- 37,43 ----
         */
  } ForkNumber;
  
! #define MAX_FORKNUM           LSNMAP_FORKNUM
  
  #define FORKNAMECHARS 4               /* max chars for a fork name */
  
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 69a624f..f40532f 100644
*** a/src/include/storage/smgr.h
--- b/src/include/storage/smgr.h
*************** typedef struct SMgrRelationData
*** 55,60 ****
--- 55,61 ----
        BlockNumber smgr_targblock; /* current insertion target block */
        BlockNumber smgr_fsm_nblocks;           /* last known size of fsm fork 
*/
        BlockNumber smgr_vm_nblocks;    /* last known size of vm fork */
+       BlockNumber smgr_lm_nblocks;    /* last known size of lm fork */
  
        /* additional public fields may someday exist here */
  
-- 
2.2.0

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to