---
 src/backend/access/brin/brin.c                |   2 +-
 src/backend/access/brin/brin_pageops.c        |   4 +-
 src/backend/access/brin/brin_revmap.c         |   2 +-
 src/backend/access/gin/gindatapage.c          |   2 +-
 src/backend/access/gin/ginfast.c              |   8 +-
 src/backend/access/gin/ginutil.c              |   2 +-
 src/backend/access/gist/gistxlog.c            |   2 +-
 src/backend/access/hash/hashovfl.c            |   4 +-
 src/backend/access/hash/hashpage.c            |  16 +-
 src/backend/access/heap/heapam.c              |   6 +-
 src/backend/access/heap/heapam_handler.c      |  10 +-
 src/backend/access/nbtree/nbtinsert.c         |   8 +-
 src/backend/access/nbtree/nbtpage.c           |  14 +-
 src/backend/access/rmgrdesc/umbradesc.c       |  24 +
 src/backend/access/rmgrdesc/xlogdesc.c        |  24 +
 src/backend/access/spgist/spgdoinsert.c       |  14 +-
 src/backend/access/transam/umbra_xlog.c       |  96 +++
 src/backend/access/transam/xloginsert.c       | 744 +++++++++++++++++-
 src/backend/access/transam/xlogreader.c       |  40 +
 src/backend/access/transam/xlogutils.c        | 182 ++++-
 src/backend/backup/basebackup.c               |  22 +-
 src/backend/commands/sequence.c               |   6 +-
 src/backend/commands/tablecmds.c              |  11 +-
 src/backend/storage/map/map.c                 |  11 +
 src/backend/storage/map/mapsuper.c            |  99 ++-
 src/backend/storage/smgr/bulk_write.c         |  53 +-
 src/backend/storage/smgr/smgr.c               |   5 -
 src/backend/storage/smgr/umbra.c              | 301 ++++++-
 src/backend/utils/adt/dbsize.c                |  14 +-
 src/bin/pg_waldump/.gitignore                 |   1 +
 src/bin/pg_waldump/Makefile                   |   9 +
 src/include/access/umbra_xlog.h               |  39 +
 src/include/access/xlogreader.h               |  11 +
 src/include/access/xlogrecord.h               |  33 +
 src/test/recovery/meson.build                 |   8 +
 .../t/057_umbra_remap_crash_consistency.pl    |  74 ++
 .../t/058_umbra_2pc_remap_recovery.pl         |  90 +++
 src/test/recovery/t/067_umbra_remap_redo.pl   |  90 +++
 ...68_umbra_old_baseline_checkpoint_window.pl |  85 ++
 .../t/069_umbra_range_remap_zeroextend.pl     | 101 +++
 .../t/070_umbra_hash_birth_block_remap.pl     |  66 ++
 .../t/072_umbra_ordinary_slim_block_remap.pl  |  69 ++
 .../recovery/t/074_umbra_torn_page_remap.pl   | 261 ++++++
 43 files changed, 2541 insertions(+), 122 deletions(-)
 create mode 100644 src/test/recovery/t/057_umbra_remap_crash_consistency.pl
 create mode 100644 src/test/recovery/t/058_umbra_2pc_remap_recovery.pl
 create mode 100644 src/test/recovery/t/067_umbra_remap_redo.pl
 create mode 100644 
src/test/recovery/t/068_umbra_old_baseline_checkpoint_window.pl
 create mode 100644 src/test/recovery/t/069_umbra_range_remap_zeroextend.pl
 create mode 100644 src/test/recovery/t/070_umbra_hash_birth_block_remap.pl
 create mode 100644 src/test/recovery/t/072_umbra_ordinary_slim_block_remap.pl
 create mode 100644 src/test/recovery/t/074_umbra_torn_page_remap.pl

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index bdb30752e0..9c484f789e 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -1148,7 +1148,7 @@ brinbuild(Relation heap, Relation index, IndexInfo 
*indexInfo)
 
                XLogBeginInsert();
                XLogRegisterData(&xlrec, SizeOfBrinCreateIdx);
-               XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);
+               XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT_BIRTH | 
REGBUF_STANDARD);
 
                recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);
 
diff --git a/src/backend/access/brin/brin_pageops.c 
b/src/backend/access/brin/brin_pageops.c
index 7da97bec43..5acbd7d358 100644
--- a/src/backend/access/brin/brin_pageops.c
+++ b/src/backend/access/brin/brin_pageops.c
@@ -283,7 +283,7 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
                        /* new page */
                        XLogRegisterData(&xlrec, SizeOfBrinUpdate);
 
-                       XLogRegisterBuffer(0, newbuf, REGBUF_STANDARD | 
(extended ? REGBUF_WILL_INIT : 0));
+                       XLogRegisterBuffer(0, newbuf, REGBUF_STANDARD | 
(extended ? REGBUF_WILL_INIT_BIRTH : 0));
                        XLogRegisterBufData(0, newtup, newsz);
 
                        /* revmap page */
@@ -435,7 +435,7 @@ brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
                XLogBeginInsert();
                XLogRegisterData(&xlrec, SizeOfBrinInsert);
 
-               XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD | (extended ? 
REGBUF_WILL_INIT : 0));
+               XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD | (extended ? 
REGBUF_WILL_INIT_BIRTH : 0));
                XLogRegisterBufData(0, tup, itemsz);
 
                XLogRegisterBuffer(1, revmapbuf, 0);
diff --git a/src/backend/access/brin/brin_revmap.c 
b/src/backend/access/brin/brin_revmap.c
index 233355cb2d..951da8f435 100644
--- a/src/backend/access/brin/brin_revmap.c
+++ b/src/backend/access/brin/brin_revmap.c
@@ -630,7 +630,7 @@ revmap_physical_extend(BrinRevmap *revmap)
                XLogRegisterData(&xlrec, SizeOfBrinRevmapExtend);
                XLogRegisterBuffer(0, revmap->rm_metaBuf, REGBUF_STANDARD);
 
-               XLogRegisterBuffer(1, buf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(1, buf, REGBUF_WILL_INIT_BIRTH);
 
                recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_REVMAP_EXTEND);
                PageSetLSN(metapage, recptr);
diff --git a/src/backend/access/gin/gindatapage.c 
b/src/backend/access/gin/gindatapage.c
index c5d7db2807..e4e6e5ff3a 100644
--- a/src/backend/access/gin/gindatapage.c
+++ b/src/backend/access/gin/gindatapage.c
@@ -1848,7 +1848,7 @@ createPostingTree(Relation index, ItemPointerData *items, 
uint32 nitems,
 
                XLogRegisterData(GinDataLeafPageGetPostingList(page),
                                                 rootsize);
-               XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT_BIRTH);
 
                recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE);
                PageSetLSN(page, recptr);
diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c
index f50848eb65..c1ee6cc4ab 100644
--- a/src/backend/access/gin/ginfast.c
+++ b/src/backend/access/gin/ginfast.c
@@ -124,7 +124,7 @@ writeListPage(Relation index, Buffer buffer,
                XLogBeginInsert();
                XLogRegisterData(&data, sizeof(ginxlogInsertListPage));
 
-               XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT_BIRTH);
                XLogRegisterBufData(0, workspace.data, size);
 
                recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
@@ -430,7 +430,7 @@ ginHeapTupleFastInsert(GinState *ginstate, 
GinTupleCollector *collector)
 
                memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
 
-               XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | 
REGBUF_STANDARD);
+               XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT_BIRTH | 
REGBUF_STANDARD);
                XLogRegisterData(&data, sizeof(ginxlogUpdateMeta));
 
                recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
@@ -640,9 +640,9 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber 
newHead,
 
                        XLogBeginInsert();
                        XLogRegisterBuffer(0, metabuffer,
-                                                          REGBUF_WILL_INIT | 
REGBUF_STANDARD);
+                                                          
REGBUF_WILL_INIT_BIRTH | REGBUF_STANDARD);
                        for (i = 0; i < data.ndeleted; i++)
-                               XLogRegisterBuffer(i + 1, buffers[i], 
REGBUF_WILL_INIT);
+                               XLogRegisterBuffer(i + 1, buffers[i], 
REGBUF_WILL_INIT_BIRTH);
 
                        memcpy(&data.metadata, metadata, 
sizeof(GinMetaPageData));
 
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index d3351fbe8a..1498f210e0 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -641,7 +641,7 @@ ginUpdateStats(Relation index, const GinStatsData *stats, 
bool is_build)
 
                XLogBeginInsert();
                XLogRegisterData(&data, sizeof(ginxlogUpdateMeta));
-               XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | 
REGBUF_STANDARD);
+               XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT_BIRTH | 
REGBUF_STANDARD);
 
                recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
                PageSetLSN(metapage, recptr);
diff --git a/src/backend/access/gist/gistxlog.c 
b/src/backend/access/gist/gistxlog.c
index ae538dc81c..65af1fcfb0 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -528,7 +528,7 @@ gistXLogSplit(bool page_is_leaf,
        i = 1;
        for (ptr = dist; ptr; ptr = ptr->next)
        {
-               XLogRegisterBuffer(i, ptr->buffer, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(i, ptr->buffer, REGBUF_WILL_INIT_BIRTH);
                XLogRegisterBufData(i, &(ptr->block.num), sizeof(int));
                XLogRegisterBufData(i, ptr->list, ptr->lenlist);
                i++;
diff --git a/src/backend/access/hash/hashovfl.c 
b/src/backend/access/hash/hashovfl.c
index dbc57ef958..a23a0c501e 100644
--- a/src/backend/access/hash/hashovfl.c
+++ b/src/backend/access/hash/hashovfl.c
@@ -390,7 +390,7 @@ found:
                XLogBeginInsert();
                XLogRegisterData(&xlrec, SizeOfHashAddOvflPage);
 
-               XLogRegisterBuffer(0, ovflbuf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, ovflbuf, REGBUF_WILL_INIT_BIRTH);
                XLogRegisterBufData(0, &pageopaque->hasho_bucket, 
sizeof(Bucket));
 
                XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
@@ -402,7 +402,7 @@ found:
                }
 
                if (BufferIsValid(newmapbuf))
-                       XLogRegisterBuffer(3, newmapbuf, REGBUF_WILL_INIT);
+                       XLogRegisterBuffer(3, newmapbuf, 
REGBUF_WILL_INIT_BIRTH);
 
                XLogRegisterBuffer(4, metabuf, REGBUF_STANDARD);
                XLogRegisterBufData(4, &metap->hashm_firstfree, sizeof(uint32));
diff --git a/src/backend/access/hash/hashpage.c 
b/src/backend/access/hash/hashpage.c
index 8099b0d021..3e78ddfd0f 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -199,6 +199,7 @@ _hash_getnewbuf(Relation rel, BlockNumber blkno, ForkNumber 
forkNum)
 {
        BlockNumber nblocks = RelationGetNumberOfBlocksInFork(rel, forkNum);
        Buffer          buf;
+       bool            extend_path;
 
        if (blkno == P_NEW)
                elog(ERROR, "hash AM does not use P_NEW");
@@ -209,6 +210,7 @@ _hash_getnewbuf(Relation rel, BlockNumber blkno, ForkNumber 
forkNum)
        /* smgr insists we explicitly extend the relation */
        if (blkno == nblocks)
        {
+               extend_path = true;
                buf = ExtendBufferedRel(BMR_REL(rel), forkNum, NULL,
                                                                EB_LOCK_FIRST | 
EB_SKIP_EXTENSION_LOCK);
                if (BufferGetBlockNumber(buf) != blkno)
@@ -217,6 +219,7 @@ _hash_getnewbuf(Relation rel, BlockNumber blkno, ForkNumber 
forkNum)
        }
        else
        {
+               extend_path = false;
                buf = ReadBufferExtended(rel, forkNum, blkno, RBM_ZERO_AND_LOCK,
                                                                 NULL);
        }
@@ -395,7 +398,8 @@ _hash_init(Relation rel, double num_tuples, ForkNumber 
forkNum)
 
                XLogBeginInsert();
                XLogRegisterData(&xlrec, SizeOfHashInitMetaPage);
-               XLogRegisterBuffer(0, metabuf, REGBUF_WILL_INIT | 
REGBUF_STANDARD);
+               XLogRegisterBuffer(0, metabuf,
+                                                  REGBUF_WILL_INIT_BIRTH | 
REGBUF_STANDARD);
 
                recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_META_PAGE);
 
@@ -427,9 +431,9 @@ _hash_init(Relation rel, double num_tuples, ForkNumber 
forkNum)
                _hash_initbuf(buf, metap->hashm_maxbucket, i, LH_BUCKET_PAGE, 
false);
                MarkBufferDirty(buf);
 
-               if (use_wal)
-                       log_newpage(&rel->rd_locator,
-                                               forkNum,
+                       if (use_wal)
+                               log_newpage(&rel->rd_locator,
+                                                       forkNum,
                                                blkno,
                                                BufferGetPage(buf),
                                                true);
@@ -469,7 +473,7 @@ _hash_init(Relation rel, double num_tuples, ForkNumber 
forkNum)
 
                XLogBeginInsert();
                XLogRegisterData(&xlrec, SizeOfHashInitBitmapPage);
-               XLogRegisterBuffer(0, bitmapbuf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, bitmapbuf, REGBUF_WILL_INIT_BIRTH);
 
                /*
                 * This is safe only because nobody else can be modifying the 
index at
@@ -910,7 +914,7 @@ restart_expand:
                XLogBeginInsert();
 
                XLogRegisterBuffer(0, buf_oblkno, REGBUF_STANDARD);
-               XLogRegisterBuffer(1, buf_nblkno, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(1, buf_nblkno, REGBUF_WILL_INIT_BIRTH);
                XLogRegisterBuffer(2, metabuf, REGBUF_STANDARD);
 
                if (metap_update_masks)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index abfd8e8970..dbf4826c0d 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2111,7 +2111,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId 
cid,
                        PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
                {
                        info |= XLOG_HEAP_INIT_PAGE;
-                       bufflags |= REGBUF_WILL_INIT;
+                       bufflags |= REGBUF_WILL_INIT_BIRTH;
                }
 
                xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
@@ -2561,7 +2561,7 @@ heap_multi_insert(Relation relation, TupleTableSlot 
**slots, int ntuples,
                        if (init)
                        {
                                info |= XLOG_HEAP_INIT_PAGE;
-                               bufflags |= REGBUF_WILL_INIT;
+                               bufflags |= REGBUF_WILL_INIT_BIRTH;
                        }
 
                        /*
@@ -8897,7 +8897,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
 
        bufflags = REGBUF_STANDARD;
        if (init)
-               bufflags |= REGBUF_WILL_INIT;
+               bufflags |= REGBUF_WILL_INIT_BIRTH;
        if (need_tuple_data)
                bufflags |= REGBUF_KEEP_DATA;
 
diff --git a/src/backend/access/heap/heapam_handler.c 
b/src/backend/access/heap/heapam_handler.c
index 20d3b46e06..7c45b50aca 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -569,7 +569,8 @@ heapam_relation_copy_data(Relation rel, const 
RelFileLocator *newrlocator)
        {
                if (smgrexists(RelationGetSmgr(rel), forkNum))
                {
-                       smgrcreate(dstrel, forkNum, false);
+                       if (!smgrisinternalfork(forkNum))
+                               smgrcreate(dstrel, forkNum, false);
 
                        /*
                         * WAL log creation if the relation is persistent, or 
this is the
@@ -579,11 +580,14 @@ heapam_relation_copy_data(Relation rel, const 
RelFileLocator *newrlocator)
                                (rel->rd_rel->relpersistence == 
RELPERSISTENCE_UNLOGGED &&
                                 forkNum == INIT_FORKNUM))
                                log_smgrcreate(newrlocator, forkNum);
-                       RelationCopyStorage(RelationGetSmgr(rel), dstrel, 
forkNum,
-                                                               
rel->rd_rel->relpersistence);
+                       if (!smgrisinternalfork(forkNum))
+                               RelationCopyStorage(RelationGetSmgr(rel), 
dstrel, forkNum,
+                                                                       
rel->rd_rel->relpersistence);
                }
        }
 
+       smgrcopyrelationmetadata(RelationGetSmgr(rel), dstrel,
+                                                        
rel->rd_rel->relpersistence);
 
        /* drop old relation, and close new one */
        RelationDropStorage(rel);
diff --git a/src/backend/access/nbtree/nbtinsert.c 
b/src/backend/access/nbtree/nbtinsert.c
index c8af97dd23..e754de9679 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -1377,7 +1377,7 @@ _bt_insertonpg(Relation rel,
                                        xlmeta.allequalimage = 
metad->btm_allequalimage;
 
                                        XLogRegisterBuffer(2, metabuf,
-                                                                          
REGBUF_WILL_INIT | REGBUF_STANDARD);
+                                                                          
REGBUF_WILL_INIT_BIRTH | REGBUF_STANDARD);
                                        XLogRegisterBufData(2, &xlmeta,
                                                                                
sizeof(xl_btree_metadata));
                                }
@@ -2011,7 +2011,7 @@ _bt_split(Relation rel, Relation heaprel, BTScanInsert 
itup_key, Buffer buf,
                XLogRegisterData(&xlrec, SizeOfBtreeSplit);
 
                XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
-               XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT_BIRTH);
                /* Log original right sibling, since we've changed its 
prev-pointer */
                if (!isrightmost)
                        XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
@@ -2612,9 +2612,9 @@ _bt_newlevel(Relation rel, Relation heaprel, Buffer lbuf, 
Buffer rbuf)
                XLogBeginInsert();
                XLogRegisterData(&xlrec, SizeOfBtreeNewroot);
 
-               XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT_BIRTH);
                XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
-               XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT | 
REGBUF_STANDARD);
+               XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT_BIRTH | 
REGBUF_STANDARD);
 
                Assert(metad->btm_version >= BTREE_NOVAC_VERSION);
                md.version = metad->btm_version;
diff --git a/src/backend/access/nbtree/nbtpage.c 
b/src/backend/access/nbtree/nbtpage.c
index 0547038616..cb6092da52 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -289,7 +289,7 @@ _bt_set_cleanup_info(Relation rel, BlockNumber num_delpages)
                xl_btree_metadata md;
 
                XLogBeginInsert();
-               XLogRegisterBuffer(0, metabuf, REGBUF_WILL_INIT | 
REGBUF_STANDARD);
+               XLogRegisterBuffer(0, metabuf, REGBUF_WILL_INIT_BIRTH | 
REGBUF_STANDARD);
 
                Assert(metad->btm_version >= BTREE_NOVAC_VERSION);
                md.version = metad->btm_version;
@@ -479,8 +479,8 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
                        xl_btree_metadata md;
 
                        XLogBeginInsert();
-                       XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
-                       XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT | 
REGBUF_STANDARD);
+                       XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT_BIRTH);
+                       XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT_BIRTH | 
REGBUF_STANDARD);
 
                        Assert(metad->btm_version >= BTREE_NOVAC_VERSION);
                        md.version = metad->btm_version;
@@ -2294,7 +2294,7 @@ _bt_mark_page_halfdead(Relation rel, Relation heaprel, 
Buffer leafbuf,
                        xlrec.topparent = InvalidBlockNumber;
 
                XLogBeginInsert();
-               XLogRegisterBuffer(0, leafbuf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, leafbuf, REGBUF_WILL_INIT_BIRTH);
                XLogRegisterBuffer(1, subtreeparent, REGBUF_STANDARD);
 
                page = BufferGetPage(leafbuf);
@@ -2713,12 +2713,12 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, 
BlockNumber scanblkno,
 
                XLogBeginInsert();
 
-               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT_BIRTH);
                if (BufferIsValid(lbuf))
                        XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
                XLogRegisterBuffer(2, rbuf, REGBUF_STANDARD);
                if (target != leafblkno)
-                       XLogRegisterBuffer(3, leafbuf, REGBUF_WILL_INIT);
+                       XLogRegisterBuffer(3, leafbuf, REGBUF_WILL_INIT_BIRTH);
 
                /* information stored on the target/to-be-unlinked block */
                xlrec.leftsib = leftsib;
@@ -2735,7 +2735,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, 
BlockNumber scanblkno,
 
                if (BufferIsValid(metabuf))
                {
-                       XLogRegisterBuffer(4, metabuf, REGBUF_WILL_INIT | 
REGBUF_STANDARD);
+                       XLogRegisterBuffer(4, metabuf, REGBUF_WILL_INIT_BIRTH | 
REGBUF_STANDARD);
 
                        Assert(metad->btm_version >= BTREE_NOVAC_VERSION);
                        xlmeta.version = metad->btm_version;
diff --git a/src/backend/access/rmgrdesc/umbradesc.c 
b/src/backend/access/rmgrdesc/umbradesc.c
index 6bad4bb38e..a6b3e6e55e 100644
--- a/src/backend/access/rmgrdesc/umbradesc.c
+++ b/src/backend/access/rmgrdesc/umbradesc.c
@@ -47,6 +47,24 @@ umbra_desc(StringInfo buf, XLogReaderState *record)
                                                 path.str, xlrec->lblkno, 
xlrec->old_pblkno,
                                                 xlrec->new_pblkno);
        }
+       else if (info == XLOG_UMBRA_RANGE_REMAP)
+       {
+               xl_umbra_range_remap *xlrec = (xl_umbra_range_remap *) rec;
+               RelPathStr      path = umbra_fork_relpath(xlrec->rlocator, 
xlrec->forknum);
+
+               appendStringInfo(buf, "%s count %u end_lblk %u",
+                                                path.str, xlrec->count, 
xlrec->end_lblkno);
+       }
+       else if (info == XLOG_UMBRA_RANGE_REMAP_COMPACT)
+       {
+               xl_umbra_range_remap_compact *xlrec =
+                       (xl_umbra_range_remap_compact *) rec;
+               RelPathStr      path = umbra_fork_relpath(xlrec->rlocator, 
xlrec->forknum);
+
+               appendStringInfo(buf, "%s compact first_lblk %u first_pblk %u 
count %u",
+                                                path.str, xlrec->first_lblkno, 
xlrec->first_pblkno,
+                                                xlrec->count);
+       }
        else if (info == XLOG_UMBRA_SKIP_WAL_DENSE_MAP)
        {
                xl_umbra_skip_wal_dense_map *xlrec =
@@ -72,6 +90,12 @@ umbra_identify(uint8 info)
                case XLOG_UMBRA_MAP_SET:
                        id = "MAP_SET";
                        break;
+               case XLOG_UMBRA_RANGE_REMAP:
+                       id = "RANGE_REMAP";
+                       break;
+               case XLOG_UMBRA_RANGE_REMAP_COMPACT:
+                       id = "RANGE_REMAP_COMPACT";
+                       break;
                case XLOG_UMBRA_SKIP_WAL_DENSE_MAP:
                        id = "SKIP_WAL_DENSE_MAP";
                        break;
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c 
b/src/backend/access/rmgrdesc/xlogdesc.c
index 0fc4f48ca6..2c2e5f06c4 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -382,6 +382,30 @@ XLogRecGetBlockRefInfo(XLogReaderState *record, bool 
pretty,
                                }
                        }
 
+#ifdef USE_UMBRA
+                       if (blkref->has_remap)
+                       {
+                               uint8           remap_format =
+                                       XLogRecGetInfo(record) & 
XLR_UMBRA_REMAP_FORMAT_MASK;
+
+                               if (remap_format != 0)
+                               {
+                                       appendStringInfo(buf,
+                                                                        "; 
remap: unsupported format bits 0x%02X",
+                                                                        
remap_format);
+                               }
+                               else
+                               {
+                                       appendStringInfo(buf,
+                                                                        "; 
remap: old_pblk %u new_pblk %u logical_nblocks %u next_free_pblk %u",
+                                                                        
blkref->old_pblkno,
+                                                                        
blkref->new_pblkno,
+                                                                        
blkref->logical_nblocks,
+                                                                        
blkref->next_free_pblkno);
+                               }
+                       }
+#endif
+
                        if (pretty)
                                appendStringInfoChar(buf, '\n');
                }
diff --git a/src/backend/access/spgist/spgdoinsert.c 
b/src/backend/access/spgist/spgdoinsert.c
index 7c7371c69e..f8ede933e1 100644
--- a/src/backend/access/spgist/spgdoinsert.c
+++ b/src/backend/access/spgist/spgdoinsert.c
@@ -299,7 +299,7 @@ addLeafTuple(Relation index, SpGistState *state, 
SpGistLeafTuple leafTuple,
 
                flags = REGBUF_STANDARD;
                if (xlrec.newPage)
-                       flags |= REGBUF_WILL_INIT;
+                       flags |= REGBUF_WILL_INIT_BIRTH;
                XLogRegisterBuffer(0, current->buffer, flags);
                if (xlrec.offnumParent != InvalidOffsetNumber)
                        XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
@@ -536,7 +536,7 @@ moveLeafs(Relation index, SpGistState *state,
                XLogRegisterData(leafdata, leafptr - leafdata);
 
                XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
-               XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? 
REGBUF_WILL_INIT : 0));
+               XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? 
REGBUF_WILL_INIT_BIRTH : 0));
                XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
 
                recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
@@ -1377,7 +1377,7 @@ doPickSplit(Relation index, SpGistState *state,
                {
                        flags = REGBUF_STANDARD;
                        if (xlrec.initSrc)
-                               flags |= REGBUF_WILL_INIT;
+                               flags |= REGBUF_WILL_INIT_BIRTH;
                        XLogRegisterBuffer(0, saveCurrent.buffer, flags);
                }
 
@@ -1386,14 +1386,14 @@ doPickSplit(Relation index, SpGistState *state,
                {
                        flags = REGBUF_STANDARD;
                        if (xlrec.initDest)
-                               flags |= REGBUF_WILL_INIT;
+                               flags |= REGBUF_WILL_INIT_BIRTH;
                        XLogRegisterBuffer(1, newLeafBuffer, flags);
                }
 
                /* Inner page */
                flags = REGBUF_STANDARD;
                if (xlrec.initInner)
-                       flags |= REGBUF_WILL_INIT;
+                       flags |= REGBUF_WILL_INIT_BIRTH;
                XLogRegisterBuffer(2, current->buffer, flags);
 
                /* Parent page, if different from inner page */
@@ -1675,7 +1675,7 @@ spgAddNodeAction(Relation index, SpGistState *state,
                        /* new page */
                        flags = REGBUF_STANDARD;
                        if (xlrec.newPage)
-                               flags |= REGBUF_WILL_INIT;
+                       flags |= REGBUF_WILL_INIT_BIRTH;
                        XLogRegisterBuffer(1, current->buffer, flags);
                        /* parent page (if different from orig and new) */
                        if (xlrec.parentBlk == 2)
@@ -1874,7 +1874,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
 
                        flags = REGBUF_STANDARD;
                        if (xlrec.newPage)
-                               flags |= REGBUF_WILL_INIT;
+                               flags |= REGBUF_WILL_INIT_BIRTH;
                        XLogRegisterBuffer(1, newBuffer, flags);
                }
 
diff --git a/src/backend/access/transam/umbra_xlog.c 
b/src/backend/access/transam/umbra_xlog.c
index 71c7ad7bb1..186eca102e 100644
--- a/src/backend/access/transam/umbra_xlog.c
+++ b/src/backend/access/transam/umbra_xlog.c
@@ -41,6 +41,54 @@ log_umbra_map_set(RelFileLocator rlocator, ForkNumber 
forknum,
        return XLogInsert(RM_UMBRA_ID, XLOG_UMBRA_MAP_SET | 
XLR_SPECIAL_REL_UPDATE);
 }
 
+XLogRecPtr
+log_umbra_range_remap(RelFileLocator rlocator, ForkNumber forknum,
+                                         uint16 count,
+                                         const xl_umbra_range_remap_entry 
*entries)
+{
+       xl_umbra_range_remap xlrec;
+
+       Assert(count > 0);
+       Assert(entries != NULL);
+
+       xlrec.rlocator = rlocator;
+       xlrec.forknum = forknum;
+       xlrec.count = count;
+       xlrec.padding = 0;
+       xlrec.end_lblkno = entries[count - 1].lblkno;
+
+       XLogBeginInsert();
+       XLogRegisterData((char *) &xlrec, offsetof(xl_umbra_range_remap, 
entries));
+       XLogRegisterData((char *) entries,
+                                        sizeof(xl_umbra_range_remap_entry) * 
count);
+
+       return XLogInsert(RM_UMBRA_ID, XLOG_UMBRA_RANGE_REMAP | 
XLR_SPECIAL_REL_UPDATE);
+}
+
+XLogRecPtr
+log_umbra_range_remap_compact(RelFileLocator rlocator, ForkNumber forknum,
+                                                         BlockNumber 
first_lblkno,
+                                                         BlockNumber 
first_pblkno,
+                                                         uint16 count)
+{
+       xl_umbra_range_remap_compact xlrec;
+
+       Assert(count > 0);
+
+       xlrec.rlocator = rlocator;
+       xlrec.forknum = forknum;
+       xlrec.count = count;
+       xlrec.padding = 0;
+       xlrec.first_lblkno = first_lblkno;
+       xlrec.first_pblkno = first_pblkno;
+
+       XLogBeginInsert();
+       XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+       return XLogInsert(RM_UMBRA_ID,
+                                         XLOG_UMBRA_RANGE_REMAP_COMPACT | 
XLR_SPECIAL_REL_UPDATE);
+}
+
 XLogRecPtr
 log_umbra_skip_wal_dense_map(RelFileLocator rlocator,
                                                         uint16 count,
@@ -172,6 +220,54 @@ umbra_redo(XLogReaderState *record)
                        }
                        break;
 
+               case XLOG_UMBRA_RANGE_REMAP:
+                       {
+                               xl_umbra_range_remap *xlrec;
+                               xl_umbra_range_remap_entry *entries;
+                               SMgrRelation reln;
+                               BlockNumber *pblknos;
+
+                               xlrec = (xl_umbra_range_remap *) 
XLogRecGetData(record);
+                               entries = xlrec->entries;
+                               reln = smgropen(xlrec->rlocator, 
INVALID_PROC_NUMBER);
+
+                               if (!UmMetadataExists(reln))
+                                       break;
+
+                               pblknos = palloc(sizeof(BlockNumber) * 
xlrec->count);
+                               for (int i = 0; i < xlrec->count; i++)
+                                       pblknos[i] = entries[i].new_pblkno;
+
+                               UmApplyReservedRangeRemap(reln, xlrec->forknum,
+                                                                               
  entries[0].lblkno, xlrec->count,
+                                                                               
  pblknos, record->EndRecPtr, true);
+                               pfree(pblknos);
+                       }
+                       break;
+
+               case XLOG_UMBRA_RANGE_REMAP_COMPACT:
+                       {
+                               xl_umbra_range_remap_compact *xlrec;
+                               SMgrRelation reln;
+                               BlockNumber *pblknos;
+
+                               xlrec = (xl_umbra_range_remap_compact *) 
XLogRecGetData(record);
+                               reln = smgropen(xlrec->rlocator, 
INVALID_PROC_NUMBER);
+
+                               if (!UmMetadataExists(reln))
+                                       break;
+
+                               pblknos = palloc(sizeof(BlockNumber) * 
xlrec->count);
+                               for (int i = 0; i < xlrec->count; i++)
+                                       pblknos[i] = xlrec->first_pblkno + i;
+
+                               UmApplyReservedRangeRemap(reln, xlrec->forknum,
+                                                                               
  xlrec->first_lblkno, xlrec->count,
+                                                                               
  pblknos, record->EndRecPtr, true);
+                               pfree(pblknos);
+                       }
+                       break;
+
                case XLOG_UMBRA_SKIP_WAL_DENSE_MAP:
                        {
                                xl_umbra_skip_wal_dense_map *xlrec;
diff --git a/src/backend/access/transam/xloginsert.c 
b/src/backend/access/transam/xloginsert.c
index f2e10b82b7..85baf69b2b 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -39,6 +39,12 @@
 #include "replication/origin.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
+#include "storage/smgr.h"
+#ifdef USE_UMBRA
+#include "storage/map.h"
+#include "storage/umbra.h"
+#include "storage/umfile.h"
+#endif
 #include "utils/memutils.h"
 #include "utils/pgstat_internal.h"
 #include "utils/rel.h"
@@ -85,6 +91,18 @@ typedef struct
        XLogRecData bkp_rdatas[2];      /* temporary rdatas used to hold 
references to
                                                                 * backup block 
data in XLogRecordAssemble() */
 
+#ifdef USE_UMBRA
+       bool            has_remap;              /* true if remap metadata is 
prepared */
+       bool            remap_in_record; /* true if current assembled record 
includes remap */
+       bool            remap_committed; /* true if mapping switch was 
committed after insert */
+       bool            wal_owns_firstborn; /* this WAL block owns first-born 
mapping */
+       SMgrRelation remap_reln;        /* cached relation handle for remap 
commit */
+       BlockNumber old_pblkno;         /* remap: old physical block number */
+       BlockNumber new_pblkno;         /* remap: new physical block number */
+       BlockNumber remap_logical_nblocks; /* assembled logical frontier 
payload */
+       BlockNumber remap_next_free_pblkno; /* assembled allocator frontier 
payload */
+#endif
+
        /* buffer to store a compressed version of backup block image */
        char            compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
@@ -137,14 +155,159 @@ static bool begininsert_called = false;
 /* Memory context to hold the registered buffer and data references. */
 static MemoryContext xloginsert_cxt;
 
-static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
-                                                                          
XLogRecPtr RedoRecPtr, bool doPageWrites,
-                                                                          
XLogRecPtr *fpw_lsn, int *num_fpi,
-                                                                          
uint64 *fpi_bytes,
-                                                                          bool 
*topxid_included);
+#ifndef USE_UMBRA
+static XLogRecData *XLogRecordAssembleMd(RmgrId rmid, uint8 info,
+                                                                               
 XLogRecPtr RedoRecPtr, bool doPageWrites,
+                                                                               
 XLogRecPtr *fpw_lsn, int *num_fpi,
+                                                                               
 uint64 *fpi_bytes,
+                                                                               
 bool *topxid_included);
+#endif
+#ifdef USE_UMBRA
+static XLogRecData *XLogRecordAssembleUmbra(RmgrId rmid, uint8 info,
+                                                                               
        XLogRecPtr RedoRecPtr, bool doPageWrites,
+                                                                               
        XLogRecPtr *fpw_lsn, int *num_fpi,
+                                                                               
        uint64 *fpi_bytes,
+                                                                               
        bool *topxid_included);
+static void XLogCommitBlockRemapsUmbra(XLogRecPtr record_endptr);
+static void XLogAbortBlockRemapsUmbra(void);
+static void XLogFillBlockRemapFrontierUmbra(registered_buffer *regbuf,
+                                                                               
        XLogRecordBlockRemapHeader *rbmh);
+#else
+static void XLogCommitBlockRemapsMd(XLogRecPtr record_endptr);
+#endif
 static bool XLogCompressBackupBlock(const PageData *page, uint16 hole_offset,
                                                                        uint16 
hole_length, void *dest, uint16 *dlen);
 
+typedef struct xlog_storage_mgr
+{
+       XLogRecData *(*xlog_record_assemble) (RmgrId rmid, uint8 info,
+                                                                               
  XLogRecPtr RedoRecPtr,
+                                                                               
  bool doPageWrites,
+                                                                               
  XLogRecPtr *fpw_lsn, int *num_fpi,
+                                                                               
  uint64 *fpi_bytes,
+                                                                               
  bool *topxid_included);
+       void    (*xlog_insert_finish) (XLogRecPtr record_endptr);
+} xlog_storage_mgr;
+
+static const xlog_storage_mgr xlog_storage_mgr_f = {
+#ifdef USE_UMBRA
+       .xlog_record_assemble = XLogRecordAssembleUmbra,
+       .xlog_insert_finish = XLogCommitBlockRemapsUmbra,
+#else
+       .xlog_record_assemble = XLogRecordAssembleMd,
+       .xlog_insert_finish = XLogCommitBlockRemapsMd,
+#endif
+};
+
+#ifdef USE_UMBRA
+static void
+XLogFillBlockRemapFrontierUmbra(registered_buffer *regbuf,
+                                                               
XLogRecordBlockRemapHeader *rbmh)
+{
+       SMgrRelation    reln = regbuf->remap_reln;
+       UmbraFileContext *ctx;
+       BlockNumber             logical_nblocks = InvalidBlockNumber;
+       BlockNumber             next_free_pblk = InvalidBlockNumber;
+
+       Assert(rbmh != NULL);
+
+       if (reln == NULL)
+               reln = smgropen(regbuf->rlocator, INVALID_PROC_NUMBER);
+
+       ctx = umfile_ctx_acquire(reln->smgr_rlocator);
+
+       if (MapSBlockTryGetLogicalNblocks(ctx, regbuf->rlocator,
+                                                                         
regbuf->forkno, &logical_nblocks))
+               rbmh->logical_nblocks = Max(logical_nblocks, regbuf->block + 1);
+       else
+               rbmh->logical_nblocks = regbuf->block + 1;
+
+       if (MapSBlockTryGetNextFreePhysBlock(ctx, regbuf->rlocator,
+                                                                               
 regbuf->forkno, &next_free_pblk))
+               rbmh->next_free_pblkno = Max(next_free_pblk,
+                                                                        
regbuf->new_pblkno + 1);
+       else
+               rbmh->next_free_pblkno = regbuf->new_pblkno + 1;
+
+       regbuf->remap_logical_nblocks = rbmh->logical_nblocks;
+       regbuf->remap_next_free_pblkno = rbmh->next_free_pblkno;
+}
+
+static void
+XLogCommitBlockRemapsUmbra(XLogRecPtr record_endptr)
+{
+       int                     block_id;
+
+       for (block_id = 0; block_id < max_registered_block_id; block_id++)
+       {
+               registered_buffer *regbuf = &registered_buffers[block_id];
+               UmbraFileContext *ctx;
+
+               if (!regbuf->in_use || !regbuf->has_remap || 
!regbuf->remap_in_record)
+                       continue;
+               if (regbuf->remap_reln == NULL)
+                       continue;
+
+               ctx = umfile_ctx_acquire(regbuf->remap_reln->smgr_rlocator);
+
+               UmMapSetMapping(regbuf->remap_reln, regbuf->forkno, 
regbuf->block,
+                                               regbuf->new_pblkno, 
record_endptr);
+               if (regbuf->old_pblkno == InvalidBlockNumber)
+               {
+                       MapSBlockBumpLogicalNblocks(ctx,
+                                                                               
regbuf->rlocator,
+                                                                               
regbuf->forkno,
+                                                                               
regbuf->remap_logical_nblocks != InvalidBlockNumber ?
+                                                                               
regbuf->remap_logical_nblocks :
+                                                                               
regbuf->block + 1,
+                                                                               
record_endptr);
+                       smgrbumpcachednblocks(regbuf->remap_reln,
+                                                                 
regbuf->forkno,
+                                                                 regbuf->block 
+ 1);
+               }
+
+               MapSBlockBumpNextFreePhysBlock(ctx,
+                                                                          
regbuf->rlocator,
+                                                                          
regbuf->forkno,
+                                                                          
regbuf->remap_next_free_pblkno != InvalidBlockNumber ?
+                                                                          
regbuf->remap_next_free_pblkno :
+                                                                          
regbuf->new_pblkno + 1,
+                                                                          
record_endptr);
+               MapInflightRelease(regbuf->rlocator, regbuf->forkno,
+                                                  regbuf->block);
+               regbuf->wal_owns_firstborn = false;
+               regbuf->remap_committed = true;
+       }
+}
+
+static void
+XLogAbortBlockRemapsUmbra(void)
+{
+       int                     block_id;
+
+       for (block_id = 0; block_id < max_registered_block_id; block_id++)
+       {
+               registered_buffer *regbuf = &registered_buffers[block_id];
+
+               if (!regbuf->in_use || !regbuf->has_remap)
+                       continue;
+               if (regbuf->remap_committed)
+                       continue;
+
+               MapInflightRelease(regbuf->rlocator, regbuf->forkno,
+                                                  regbuf->block);
+               regbuf->wal_owns_firstborn = false;
+               regbuf->remap_in_record = false;
+       }
+}
+#else
+static void
+XLogCommitBlockRemapsMd(XLogRecPtr record_endptr)
+{
+       (void) record_endptr;
+}
+#endif
+
 /*
  * Begin constructing a WAL record. This must be called before the
  * XLogRegister* functions and XLogInsert().
@@ -227,8 +390,25 @@ XLogResetInsertion(void)
 {
        int                     i;
 
+#ifdef USE_UMBRA
+       XLogAbortBlockRemapsUmbra();
+#endif
+
        for (i = 0; i < max_registered_block_id; i++)
+       {
                registered_buffers[i].in_use = false;
+#ifdef USE_UMBRA
+               registered_buffers[i].has_remap = false;
+               registered_buffers[i].remap_in_record = false;
+               registered_buffers[i].remap_committed = false;
+               registered_buffers[i].wal_owns_firstborn = false;
+               registered_buffers[i].remap_reln = NULL;
+               registered_buffers[i].old_pblkno = InvalidBlockNumber;
+               registered_buffers[i].new_pblkno = InvalidBlockNumber;
+               registered_buffers[i].remap_logical_nblocks = 
InvalidBlockNumber;
+               registered_buffers[i].remap_next_free_pblkno = 
InvalidBlockNumber;
+#endif
+       }
 
        num_rdatas = 0;
        max_registered_block_id = 0;
@@ -283,6 +463,17 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 
flags)
        regbuf->flags = flags;
        regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
        regbuf->rdata_len = 0;
+#ifdef USE_UMBRA
+       regbuf->has_remap = false;
+       regbuf->remap_in_record = false;
+       regbuf->remap_committed = false;
+       regbuf->wal_owns_firstborn = false;
+       regbuf->remap_reln = NULL;
+       regbuf->old_pblkno = InvalidBlockNumber;
+       regbuf->new_pblkno = InvalidBlockNumber;
+       regbuf->remap_logical_nblocks = InvalidBlockNumber;
+       regbuf->remap_next_free_pblkno = InvalidBlockNumber;
+#endif
 
        /*
         * Check that this page hasn't already been registered with some other
@@ -336,6 +527,17 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator 
*rlocator, ForkNumber forknum,
        regbuf->flags = flags;
        regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
        regbuf->rdata_len = 0;
+#ifdef USE_UMBRA
+       regbuf->has_remap = false;
+       regbuf->remap_in_record = false;
+       regbuf->remap_committed = false;
+       regbuf->wal_owns_firstborn = false;
+       regbuf->remap_reln = NULL;
+       regbuf->old_pblkno = InvalidBlockNumber;
+       regbuf->new_pblkno = InvalidBlockNumber;
+       regbuf->remap_logical_nblocks = InvalidBlockNumber;
+       regbuf->remap_next_free_pblkno = InvalidBlockNumber;
+#endif
 
        /*
         * Check that this page hasn't already been registered with some other
@@ -509,30 +711,37 @@ XLogInsert(RmgrId rmid, uint8 info)
                return EndPos;
        }
 
-       do
+       PG_TRY();
        {
-               XLogRecPtr      RedoRecPtr;
-               bool            doPageWrites;
-               bool            topxid_included = false;
-               XLogRecPtr      fpw_lsn;
-               XLogRecData *rdt;
-               int                     num_fpi = 0;
-               uint64          fpi_bytes = 0;
-
-               /*
-                * Get values needed to decide whether to do full-page writes. 
Since
-                * we don't yet have an insertion lock, these could change 
under us,
-                * but XLogInsertRecord will recheck them once it has a lock.
-                */
-               GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
-
-               rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
-                                                                &fpw_lsn, 
&num_fpi, &fpi_bytes,
-                                                                
&topxid_included);
-
-               EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, 
num_fpi,
-                                                                 fpi_bytes, 
topxid_included);
-       } while (!XLogRecPtrIsValid(EndPos));
+               do
+               {
+                       XLogRecPtr      RedoRecPtr;
+                       bool            doPageWrites;
+                       bool            topxid_included = false;
+                       XLogRecPtr      fpw_lsn;
+                       XLogRecData *rdt;
+                       int                     num_fpi = 0;
+                       uint64          fpi_bytes = 0;
+
+                       GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
+
+                       rdt = xlog_storage_mgr_f.xlog_record_assemble(rmid, 
info, RedoRecPtr,
+                                                                               
                                  doPageWrites, &fpw_lsn,
+                                                                               
                                  &num_fpi, &fpi_bytes,
+                                                                               
                                  &topxid_included);
+
+                       EndPos = XLogInsertRecord(rdt, fpw_lsn, 
curinsert_flags, num_fpi,
+                                                                         
fpi_bytes, topxid_included);
+               } while (!XLogRecPtrIsValid(EndPos));
+
+               xlog_storage_mgr_f.xlog_insert_finish(EndPos);
+       }
+       PG_CATCH();
+       {
+               XLogResetInsertion();
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
 
        XLogResetInsertion();
 
@@ -617,11 +826,12 @@ XLogGetFakeLSN(Relation rel)
  * *topxid_included is set if the topmost transaction ID is logged with the
  * current subtransaction.
  */
+#ifndef USE_UMBRA
 static XLogRecData *
-XLogRecordAssemble(RmgrId rmid, uint8 info,
-                                  XLogRecPtr RedoRecPtr, bool doPageWrites,
-                                  XLogRecPtr *fpw_lsn, int *num_fpi, uint64 
*fpi_bytes,
-                                  bool *topxid_included)
+XLogRecordAssembleMd(RmgrId rmid, uint8 info,
+                                        XLogRecPtr RedoRecPtr, bool 
doPageWrites,
+                                        XLogRecPtr *fpw_lsn, int *num_fpi, 
uint64 *fpi_bytes,
+                                        bool *topxid_included)
 {
        XLogRecData *rdt;
        uint64          total_len = 0;
@@ -1009,6 +1219,470 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
        return &hdr_rdt;
 }
+#endif
+
+#ifdef USE_UMBRA
+static XLogRecData *
+XLogRecordAssembleUmbra(RmgrId rmid, uint8 info,
+                                               XLogRecPtr RedoRecPtr, bool 
doPageWrites,
+                                               XLogRecPtr *fpw_lsn, int 
*num_fpi,
+                                               uint64 *fpi_bytes,
+                                               bool *topxid_included)
+{
+       XLogRecData *rdt;
+       uint64          total_len = 0;
+       int                     block_id;
+       pg_crc32c       rdata_crc;
+       bool            needs_backup_by_block[XLR_MAX_BLOCK_ID + 1] = {0};
+       bool            needs_data_by_block[XLR_MAX_BLOCK_ID + 1] = {0};
+       bool            include_image_by_block[XLR_MAX_BLOCK_ID + 1] = {0};
+       bool            include_remap_by_block[XLR_MAX_BLOCK_ID + 1] = {0};
+       registered_buffer *prev_regbuf = NULL;
+       XLogRecData *rdt_datas_last;
+       XLogRecord *rechdr;
+       char       *scratch = hdr_scratch;
+
+       rechdr = (XLogRecord *) scratch;
+       scratch += SizeOfXLogRecord;
+
+       hdr_rdt.next = NULL;
+       rdt_datas_last = &hdr_rdt;
+       hdr_rdt.data = hdr_scratch;
+
+       if (wal_consistency_checking[rmid])
+               info |= XLR_CHECK_CONSISTENCY;
+
+       *fpw_lsn = InvalidXLogRecPtr;
+       for (block_id = 0; block_id < max_registered_block_id; block_id++)
+       {
+               registered_buffer *regbuf = &registered_buffers[block_id];
+               bool            needs_backup;
+               bool            needs_remap;
+               XLogRecordBlockRemapHeader rbmh;
+               bool            include_image;
+               bool            include_remap = false;
+               bool            wal_owned_remap_available = false;
+               SMgrRelation reln = NULL;
+
+               if (!regbuf->in_use)
+                       continue;
+
+               regbuf->remap_in_record = false;
+               regbuf->wal_owns_firstborn = false;
+
+               if (regbuf->flags & REGBUF_FORCE_IMAGE)
+               {
+                       needs_backup = true;
+                       needs_remap = false;
+               }
+               else if (regbuf->flags & REGBUF_NO_IMAGE)
+               {
+                       needs_backup = false;
+                       needs_remap = false;
+               }
+               else if (!doPageWrites)
+               {
+                       needs_backup = false;
+                       needs_remap = false;
+               }
+               else
+               {
+                       XLogRecPtr      page_lsn = PageGetLSN(regbuf->page);
+
+                       if (rmid != RM_XLOG_ID || info != XLOG_FPI_FOR_HINT)
+                       {
+                               reln = smgropen(regbuf->rlocator, 
INVALID_PROC_NUMBER);
+                               regbuf->remap_reln = reln;
+                               wal_owned_remap_available =
+                                       UmWalOwnedRemapAvailable(reln, 
regbuf->forkno);
+                       }
+
+                       if (!wal_owned_remap_available ||
+                               (rmid == RM_XLOG_ID && info == 
XLOG_FPI_FOR_HINT))
+                       {
+                               needs_backup = (page_lsn <= RedoRecPtr);
+                               needs_remap = false;
+                       }
+                       else
+                       {
+                               needs_backup = false;
+                               needs_remap = (page_lsn <= RedoRecPtr);
+                       }
+                       if (!needs_backup && !needs_remap)
+                       {
+                               if (!XLogRecPtrIsValid(*fpw_lsn) || page_lsn < 
*fpw_lsn)
+                                       *fpw_lsn = page_lsn;
+                       }
+               }
+
+               Assert(!(needs_backup && needs_remap));
+
+               if (!regbuf->has_remap &&
+                       (regbuf->flags & REGBUF_LOGICAL_BIRTH) != 0)
+               {
+                       bool            got_mapping;
+
+                       if (reln == NULL)
+                       {
+                               reln = smgropen(regbuf->rlocator, 
INVALID_PROC_NUMBER);
+                               regbuf->remap_reln = reln;
+                       }
+                       if (!UmWalOwnedFirstbornAvailable(reln, regbuf->forkno,
+                                                                               
         regbuf->block))
+                               goto remap_birth_done;
+
+                       /*
+                        * For WAL-owned first-born, the producer must have 
already created
+                        * any needed birth claim through 
smgrextend()/smgrzeroextend().
+                        *
+                        * We first probe committed MAP state. If the mapping 
is still
+                        * private to this backend, UmMapReserveFreshPbkno() 
must reuse the
+                        * owner-local in-flight claim instead of reserving a 
second pblk.
+                        *
+                        * Keep this order aligned with bulk_write.c: claim 
birth first,
+                        * then let page WAL own the final commit of that same 
claim.
+                        */
+                       got_mapping = UmMapTryLookupPblkno(reln, regbuf->forkno,
+                                                                               
          regbuf->block,
+                                                                               
          &regbuf->new_pblkno);
+                       if (!got_mapping)
+                       {
+                               UmMapReserveFreshPbkno(reln, regbuf->forkno,
+                                                                          
regbuf->block,
+                                                                          
&regbuf->new_pblkno);
+                               regbuf->old_pblkno = InvalidBlockNumber;
+                               regbuf->has_remap = true;
+                               regbuf->remap_committed = false;
+                       }
+       remap_birth_done:
+                       ;
+               }
+
+               include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) 
!= 0;
+               if (regbuf->has_remap)
+                       include_remap = regbuf->has_remap;
+               else if (needs_remap)
+               {
+                       if (reln == NULL)
+                       {
+                               reln = smgropen(regbuf->rlocator, 
INVALID_PROC_NUMBER);
+                               regbuf->remap_reln = reln;
+                       }
+                       UmMapGetNewPbkno(reln, regbuf->forkno, regbuf->block,
+                                                        &regbuf->new_pblkno,
+                                                        &regbuf->old_pblkno);
+
+                       regbuf->has_remap = true;
+                       regbuf->remap_committed = false;
+                       include_remap = true;
+
+                       if (include_remap &&
+                               regbuf->new_pblkno == regbuf->old_pblkno)
+                               elog(PANIC,
+                                        "remap decision produced unchanged 
pblk for %u/%u/%u fork %u block %u",
+                                        regbuf->rlocator.spcOid,
+                                        regbuf->rlocator.dbOid,
+                                        regbuf->rlocator.relNumber,
+                                        regbuf->forkno,
+                                        regbuf->block);
+               }
+
+               if (include_remap)
+               {
+                       rbmh.old_pblkno = regbuf->old_pblkno;
+                       rbmh.new_pblkno = regbuf->new_pblkno;
+                       XLogFillBlockRemapFrontierUmbra(regbuf, &rbmh);
+
+                       if ((regbuf->flags & REGBUF_FORCE_IMAGE) == 0 &&
+                               (info & XLR_CHECK_CONSISTENCY) == 0)
+                               include_image = false;
+               }
+
+               if (regbuf->rdata_len == 0)
+                       needs_data_by_block[block_id] = false;
+               else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
+                       needs_data_by_block[block_id] = true;
+               else
+                       needs_data_by_block[block_id] = (!needs_backup || 
include_remap);
+
+               needs_backup_by_block[block_id] = needs_backup;
+               include_image_by_block[block_id] = include_image;
+               include_remap_by_block[block_id] = include_remap;
+       }
+
+       for (block_id = 0; block_id < max_registered_block_id; block_id++)
+       {
+               registered_buffer *regbuf = &registered_buffers[block_id];
+               bool            needs_backup;
+               bool            needs_data;
+               XLogRecordBlockHeader bkpb;
+               XLogRecordBlockRemapHeader rbmh;
+               XLogRecordBlockImageHeader bimg;
+               XLogRecordBlockCompressHeader cbimg = {0};
+               bool            samerel;
+               bool            is_compressed = false;
+               bool            include_image;
+               bool            include_remap;
+
+               if (!regbuf->in_use)
+                       continue;
+
+               needs_backup = needs_backup_by_block[block_id];
+               needs_data = needs_data_by_block[block_id];
+               include_image = include_image_by_block[block_id];
+               include_remap = include_remap_by_block[block_id];
+
+               regbuf->remap_in_record = false;
+               regbuf->wal_owns_firstborn = false;
+
+               bkpb.id = block_id;
+               bkpb.fork_flags = regbuf->forkno;
+               bkpb.data_length = 0;
+
+               if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
+                       bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
+
+               if (include_remap)
+               {
+                       bkpb.fork_flags |= BKPBLOCK_HAS_REMAP;
+                       regbuf->remap_in_record = true;
+               }
+
+               if (include_image)
+               {
+                       const PageData *page = regbuf->page;
+                       uint16          compressed_len = 0;
+
+                       if (regbuf->flags & REGBUF_STANDARD)
+                       {
+                               uint16          lower = ((PageHeader) 
page)->pd_lower;
+                               uint16          upper = ((PageHeader) 
page)->pd_upper;
+
+                               if (lower >= SizeOfPageHeaderData &&
+                                       upper > lower &&
+                                       upper <= BLCKSZ)
+                               {
+                                       bimg.hole_offset = lower;
+                                       cbimg.hole_length = upper - lower;
+                               }
+                               else
+                               {
+                                       bimg.hole_offset = 0;
+                                       cbimg.hole_length = 0;
+                               }
+                       }
+                       else
+                       {
+                               bimg.hole_offset = 0;
+                               cbimg.hole_length = 0;
+                       }
+
+                       if (wal_compression != WAL_COMPRESSION_NONE)
+                       {
+                               is_compressed =
+                                       XLogCompressBackupBlock(page, 
bimg.hole_offset,
+                                                                               
        cbimg.hole_length,
+                                                                               
        regbuf->compressed_page,
+                                                                               
        &compressed_len);
+                       }
+
+                       bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
+                       *num_fpi += 1;
+
+                       rdt_datas_last->next = &regbuf->bkp_rdatas[0];
+                       rdt_datas_last = rdt_datas_last->next;
+
+                       bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : 
BKPIMAGE_HAS_HOLE;
+
+                       if (needs_backup)
+                               bimg.bimg_info |= BKPIMAGE_APPLY;
+
+                       if (is_compressed)
+                       {
+                               bimg.length = compressed_len;
+
+                               switch ((WalCompression) wal_compression)
+                               {
+                                       case WAL_COMPRESSION_PGLZ:
+                                               bimg.bimg_info |= 
BKPIMAGE_COMPRESS_PGLZ;
+                                               break;
+
+                                       case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+                                               bimg.bimg_info |= 
BKPIMAGE_COMPRESS_LZ4;
+#else
+                                               elog(ERROR, "LZ4 is not 
supported by this build");
+#endif
+                                               break;
+
+                                       case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+                                               bimg.bimg_info |= 
BKPIMAGE_COMPRESS_ZSTD;
+#else
+                                               elog(ERROR, "zstd is not 
supported by this build");
+#endif
+                                               break;
+
+                                       case WAL_COMPRESSION_NONE:
+                                               Assert(false);
+                                               break;
+                               }
+
+                               rdt_datas_last->data = regbuf->compressed_page;
+                               rdt_datas_last->len = compressed_len;
+                       }
+                       else
+                       {
+                               bimg.length = BLCKSZ - cbimg.hole_length;
+
+                               if (cbimg.hole_length == 0)
+                               {
+                                       rdt_datas_last->data = page;
+                                       rdt_datas_last->len = BLCKSZ;
+                               }
+                               else
+                               {
+                                       rdt_datas_last->data = page;
+                                       rdt_datas_last->len = bimg.hole_offset;
+
+                                       rdt_datas_last->next = 
&regbuf->bkp_rdatas[1];
+                                       rdt_datas_last = rdt_datas_last->next;
+
+                                       rdt_datas_last->data =
+                                               page + (bimg.hole_offset + 
cbimg.hole_length);
+                                       rdt_datas_last->len =
+                                               BLCKSZ - (bimg.hole_offset + 
cbimg.hole_length);
+                               }
+                       }
+
+                       total_len += bimg.length;
+                       *fpi_bytes += bimg.length;
+               }
+
+               if (needs_data)
+               {
+                       Assert(regbuf->rdata_len <= UINT16_MAX);
+
+                       bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
+                       bkpb.data_length = (uint16) regbuf->rdata_len;
+                       total_len += regbuf->rdata_len;
+
+                       rdt_datas_last->next = regbuf->rdata_head;
+                       rdt_datas_last = regbuf->rdata_tail;
+               }
+
+               if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, 
prev_regbuf->rlocator))
+               {
+                       samerel = true;
+                       bkpb.fork_flags |= BKPBLOCK_SAME_REL;
+               }
+               else
+                       samerel = false;
+               prev_regbuf = regbuf;
+
+               memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
+               scratch += SizeOfXLogRecordBlockHeader;
+               if (include_remap)
+               {
+                       rbmh.old_pblkno = regbuf->old_pblkno;
+                       rbmh.new_pblkno = regbuf->new_pblkno;
+                       rbmh.logical_nblocks = regbuf->remap_logical_nblocks;
+                       rbmh.next_free_pblkno = regbuf->remap_next_free_pblkno;
+                       memcpy(scratch, &rbmh, 
SizeOfXLogRecordBlockRemapHeader);
+                       scratch += SizeOfXLogRecordBlockRemapHeader;
+               }
+               if (include_image)
+               {
+                       memcpy(scratch, &bimg, 
SizeOfXLogRecordBlockImageHeader);
+                       scratch += SizeOfXLogRecordBlockImageHeader;
+                       if (cbimg.hole_length != 0 && is_compressed)
+                       {
+                               memcpy(scratch, &cbimg,
+                                          SizeOfXLogRecordBlockCompressHeader);
+                               scratch += SizeOfXLogRecordBlockCompressHeader;
+                       }
+               }
+               if (!samerel)
+               {
+                       memcpy(scratch, &regbuf->rlocator, 
sizeof(RelFileLocator));
+                       scratch += sizeof(RelFileLocator);
+               }
+               memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
+               scratch += sizeof(BlockNumber);
+       }
+
+       if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
+               replorigin_xact_state.origin != InvalidReplOriginId)
+       {
+               *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
+               memcpy(scratch, &replorigin_xact_state.origin, 
sizeof(replorigin_xact_state.origin));
+               scratch += sizeof(replorigin_xact_state.origin);
+       }
+
+       if (IsSubxactTopXidLogPending())
+       {
+               TransactionId xid = GetTopTransactionIdIfAny();
+
+               *topxid_included = true;
+
+               *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
+               memcpy(scratch, &xid, sizeof(TransactionId));
+               scratch += sizeof(TransactionId);
+       }
+
+       if (mainrdata_len > 0)
+       {
+               if (mainrdata_len > 255)
+               {
+                       uint32 mainrdata_len_4b;
+
+                       if (mainrdata_len > PG_UINT32_MAX)
+                               ereport(ERROR,
+                                               (errmsg_internal("too much WAL 
data"),
+                                                errdetail_internal("Main data 
length is %" PRIu64 " bytes for a maximum of %u bytes.",
+                                                                               
        mainrdata_len,
+                                                                               
        PG_UINT32_MAX)));
+
+                       mainrdata_len_4b = (uint32) mainrdata_len;
+                       *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
+                       memcpy(scratch, &mainrdata_len_4b, sizeof(uint32));
+                       scratch += sizeof(uint32);
+               }
+               else
+               {
+                       *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
+                       *(scratch++) = (uint8) mainrdata_len;
+               }
+               rdt_datas_last->next = mainrdata_head;
+               rdt_datas_last = mainrdata_last;
+               total_len += mainrdata_len;
+       }
+       rdt_datas_last->next = NULL;
+
+       hdr_rdt.len = (scratch - hdr_scratch);
+       total_len += hdr_rdt.len;
+
+       INIT_CRC32C(rdata_crc);
+       COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - 
SizeOfXLogRecord);
+       for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
+               COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+
+       if (total_len > XLogRecordMaxSize)
+               ereport(ERROR,
+                               (errmsg_internal("oversized WAL record"),
+                                errdetail_internal("WAL record would be %" 
PRIu64 " bytes (of maximum %u bytes); rmid %u flags %u.",
+                                                                       
total_len, XLogRecordMaxSize, rmid, info)));
+
+       rechdr->xl_xid = GetCurrentTransactionIdIfAny();
+       rechdr->xl_tot_len = (uint32) total_len;
+       rechdr->xl_info = info;
+       rechdr->xl_rmid = rmid;
+       rechdr->xl_prev = InvalidXLogRecPtr;
+       rechdr->xl_crc = rdata_crc;
+
+       return &hdr_rdt;
+}
+#endif
 
 /*
  * Create a compressed version of a backup block image.
@@ -1194,7 +1868,7 @@ log_newpage(RelFileLocator *rlocator, ForkNumber forknum, 
BlockNumber blkno,
        int                     flags;
        XLogRecPtr      recptr;
 
-       flags = REGBUF_FORCE_IMAGE;
+       flags = REGBUF_FORCE_IMAGE | REGBUF_LOGICAL_BIRTH;
        if (page_std)
                flags |= REGBUF_STANDARD;
 
@@ -1228,7 +1902,7 @@ log_newpages(RelFileLocator *rlocator, ForkNumber 
forknum, int num_pages,
        int                     i;
        int                     j;
 
-       flags = REGBUF_FORCE_IMAGE;
+       flags = REGBUF_FORCE_IMAGE | REGBUF_LOGICAL_BIRTH;
        if (page_std)
                flags |= REGBUF_STANDARD;
 
@@ -1322,7 +1996,7 @@ log_newpage_range(Relation rel, ForkNumber forknum,
        int                     flags;
        BlockNumber blkno;
 
-       flags = REGBUF_FORCE_IMAGE;
+       flags = REGBUF_FORCE_IMAGE | REGBUF_LOGICAL_BIRTH;
        if (page_std)
                flags |= REGBUF_STANDARD;
 
diff --git a/src/backend/access/transam/xlogreader.c 
b/src/backend/access/transam/xlogreader.c
index 8849610db0..ae9c2c7802 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1795,10 +1795,20 @@ DecodeXLogRecord(XLogReaderState *state,
                        blk = &decoded->blocks[block_id];
                        blk->in_use = true;
                        blk->apply_image = false;
+#ifdef USE_UMBRA
+                       blk->has_remap = false;
+                       blk->old_pblkno = InvalidBlockNumber;
+                       blk->new_pblkno = InvalidBlockNumber;
+                       blk->logical_nblocks = InvalidBlockNumber;
+                       blk->next_free_pblkno = InvalidBlockNumber;
+#endif
 
                        COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
                        blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
                        blk->flags = fork_flags;
+#ifdef USE_UMBRA
+                       blk->has_remap = ((fork_flags & BKPBLOCK_HAS_REMAP) != 
0);
+#endif
                        blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 
0);
                        blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
 
@@ -1823,6 +1833,36 @@ DecodeXLogRecord(XLogReaderState *state,
                        }
                        datatotal += blk->data_len;
 
+#ifdef USE_UMBRA
+                       if (blk->has_remap)
+                       {
+                               uint8           remap_format =
+                                       decoded->header.xl_info & 
XLR_UMBRA_REMAP_FORMAT_MASK;
+
+                               if (remap_format != 0)
+                               {
+                                       report_invalid_record(state,
+                                                                               
  "unsupported remap format bits 0x%02X at %X/%X",
+                                                                               
  remap_format,
+                                                                               
  LSN_FORMAT_ARGS(state->ReadRecPtr));
+                                       goto err;
+                               }
+
+                               COPY_HEADER_FIELD(&blk->old_pblkno, 
sizeof(BlockNumber));
+                               COPY_HEADER_FIELD(&blk->new_pblkno, 
sizeof(BlockNumber));
+                               COPY_HEADER_FIELD(&blk->logical_nblocks, 
sizeof(BlockNumber));
+                               COPY_HEADER_FIELD(&blk->next_free_pblkno, 
sizeof(BlockNumber));
+                       }
+#else
+                       if (fork_flags & BKPBLOCK_HAS_REMAP)
+                       {
+                               report_invalid_record(state,
+                                                                         
"BKPBLOCK_HAS_REMAP is not allowed in this storage mode at %X/%X",
+                                                                         
LSN_FORMAT_ARGS(state->ReadRecPtr));
+                               goto err;
+                       }
+#endif
+
                        if (blk->has_image)
                        {
                                COPY_HEADER_FIELD(&blk->bimg_len, 
sizeof(uint16));
diff --git a/src/backend/access/transam/xlogutils.c 
b/src/backend/access/transam/xlogutils.c
index f32aac5476..3090a2eb47 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -118,6 +118,10 @@ static bool 
XLogUmbraEnsureMappedBlockForRedo(RelFileLocator rlocator,
                                                                                
          BlockNumber blkno);
 static bool XLogUmbraEnsureMetadataForRedo(RelFileLocator rlocator,
                                                                                
   ForkNumber forknum);
+static void XLogUmbraLockRedoBuffer(Buffer buf, ReadBufferMode mode,
+                                                                       bool 
get_cleanup_lock);
+static inline bool XLogBlockRemapRedoImageEnabled(void);
+static inline bool XLogBlockRemapRedoNoImageEnabled(void);
 #endif
 
 /* Report a reference to an invalid page */
@@ -421,6 +425,25 @@ XLogCheckInvalidPages(void)
 }
 
 #ifdef USE_UMBRA
+static inline bool
+XLogBlockRemapRedoImageEnabled(void)
+{
+       /*
+        * Phase-1: close remap+image redo first.
+        * This path is deterministic and does not require old->new baseline 
switch.
+        */
+       return true;
+}
+
+static inline bool
+XLogBlockRemapRedoNoImageEnabled(void)
+{
+       /*
+        * No-image remap redo uses deterministic old->new baseline handoff.
+        */
+       return true;
+}
+
 /*
  * Redo is an owner point for handle-local Umbra MAP state.
  *
@@ -501,6 +524,17 @@ XLogUmbraEnsureMetadataForRedo(RelFileLocator rlocator, 
ForkNumber forknum)
        return true;
 }
 
+static void
+XLogUmbraLockRedoBuffer(Buffer buf, ReadBufferMode mode, bool get_cleanup_lock)
+{
+       if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
+               return;
+
+       if (get_cleanup_lock)
+               LockBufferForCleanup(buf);
+       else
+               LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+}
 #endif
 
 
@@ -679,6 +713,10 @@ XLogReadBufferForRedoExtendedUmbra(XLogReaderState *record,
        Page            page;
        bool            zeromode;
        bool            willinit;
+       bool            has_remap;
+       bool            has_image;
+       DecodedBkpBlock *blk;
+       SMgrRelation remap_smgr = NULL;
 
        if (!XLogRecGetBlockTagExtended(record, block_id, &rlocator, &forknum, 
&blkno,
                                                                        
&prefetch_buffer))
@@ -696,11 +734,92 @@ XLogReadBufferForRedoExtendedUmbra(XLogReaderState 
*record,
        if (!willinit && zeromode)
                elog(PANIC, "block to be initialized in redo routine must be 
marked with WILL_INIT flag in the WAL record");
 
+       blk = XLogRecGetBlock(record, block_id);
+       has_remap = XLogRecBlockHasRemap(record, block_id);
+       has_image = XLogRecBlockImageApply(record, block_id);
+
+       if (!has_remap)
+       {
+               if (!XLogUmbraEnsureMetadataForRedo(rlocator, forknum))
+                       return BLK_NOTFOUND;
+
+               if (has_image)
+               {
+                       Assert(XLogRecHasBlockImage(record, block_id));
+                       *buf = XLogReadBufferExtended(rlocator, forknum, blkno,
+                                                                               
  get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK,
+                                                                               
  prefetch_buffer);
+                       page = BufferGetPage(*buf);
+                       if (!RestoreBlockImage(record, block_id, page))
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INTERNAL_ERROR),
+                                                errmsg_internal("%s", 
record->errormsg_buf)));
+
+                       if (!PageIsNew(page))
+                               PageSetLSN(page, lsn);
+
+                       MarkBufferDirty(*buf);
+                       if (forknum == INIT_FORKNUM)
+                               FlushOneBuffer(*buf);
+
+                       return BLK_RESTORED;
+               }
+
+               *buf = XLogReadBufferExtended(rlocator, forknum, blkno, mode,
+                                                                         
prefetch_buffer);
+               if (BufferIsValid(*buf))
+               {
+                       if (mode != RBM_ZERO_AND_LOCK && mode != 
RBM_ZERO_AND_CLEANUP_LOCK)
+                       {
+                               if (get_cleanup_lock)
+                                       LockBufferForCleanup(*buf);
+                               else
+                                       LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+                       }
+                       if (lsn <= PageGetLSN(BufferGetPage(*buf)))
+                               return BLK_DONE;
+                       return BLK_NEEDS_REDO;
+               }
+               return BLK_NOTFOUND;
+       }
+
        if (!XLogUmbraEnsureMetadataForRedo(rlocator, forknum))
                return BLK_NOTFOUND;
+       remap_smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
 
-       if (XLogRecBlockImageApply(record, block_id))
+       if (has_image)
        {
+               UmbraFileContext *ctx = 
umfile_ctx_acquire(remap_smgr->smgr_rlocator);
+               BlockNumber redo_logical_nblocks;
+               BlockNumber redo_next_free_pblkno;
+
+               if (!XLogBlockRemapRedoImageEnabled())
+                       elog(PANIC,
+                                "encountered remap-with-image WAL record 
before phase-1 redo is enabled for %u/%u/%u fork %d block %u",
+                                rlocator.spcOid, rlocator.dbOid, 
rlocator.relNumber,
+                                forknum, blkno);
+
+               redo_logical_nblocks =
+                       (blk->logical_nblocks != InvalidBlockNumber) ?
+                       blk->logical_nblocks : blkno + 1;
+               redo_next_free_pblkno =
+                       (blk->next_free_pblkno != InvalidBlockNumber) ?
+                       blk->next_free_pblkno : blk->new_pblkno + 1;
+
+               UmMapSetMapping(remap_smgr, forknum, blkno, blk->new_pblkno, 
lsn);
+               MapSBlockBumpNextFreePhysBlock(ctx, rlocator,
+                                                                          
forknum, redo_next_free_pblkno,
+                                                                          lsn);
+               if (blk->old_pblkno == InvalidBlockNumber)
+               {
+                       MapSBlockBumpLogicalNblocks(ctx, rlocator,
+                                                                               
forknum, redo_logical_nblocks,
+                                                                               
lsn);
+                       smgrbumpcachednblocks(remap_smgr, forknum, blkno + 1);
+               }
+               if (!XLogUmbraEnsureMappedBlockForRedo(rlocator, forknum, 
blkno))
+                       return BLK_NOTFOUND;
+
                Assert(XLogRecHasBlockImage(record, block_id));
                *buf = XLogReadBufferExtended(rlocator, forknum, blkno,
                                                                          
get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK,
@@ -721,6 +840,56 @@ XLogReadBufferForRedoExtendedUmbra(XLogReaderState *record,
                return BLK_RESTORED;
        }
 
+       if (!XLogBlockRemapRedoNoImageEnabled())
+               elog(PANIC,
+                        "encountered remap-without-image WAL record before 
phase-2 redo is enabled for %u/%u/%u fork %d block %u",
+                        rlocator.spcOid, rlocator.dbOid, rlocator.relNumber,
+                        forknum, blkno);
+
+       if (zeromode)
+       {
+               UmbraFileContext *ctx = 
umfile_ctx_acquire(remap_smgr->smgr_rlocator);
+               BlockNumber redo_logical_nblocks;
+               BlockNumber redo_next_free_pblkno;
+
+               Assert(willinit);
+
+               redo_logical_nblocks =
+                       (blk->logical_nblocks != InvalidBlockNumber) ?
+                       blk->logical_nblocks : blkno + 1;
+               redo_next_free_pblkno =
+                       (blk->next_free_pblkno != InvalidBlockNumber) ?
+                       blk->next_free_pblkno : blk->new_pblkno + 1;
+
+               UmMapSetMapping(remap_smgr, forknum, blkno, blk->new_pblkno, 
lsn);
+               MapSBlockBumpNextFreePhysBlock(ctx, rlocator,
+                                                                          
forknum, redo_next_free_pblkno,
+                                                                          lsn);
+               if (blk->old_pblkno == InvalidBlockNumber)
+               {
+                       MapSBlockBumpLogicalNblocks(ctx, rlocator,
+                                                                               
forknum, redo_logical_nblocks,
+                                                                               
lsn);
+                       smgrbumpcachednblocks(remap_smgr, forknum, blkno + 1);
+               }
+               if (!XLogUmbraEnsureMappedBlockForRedo(rlocator, forknum, 
blkno))
+                       return BLK_NOTFOUND;
+
+               *buf = XLogReadBufferExtended(rlocator, forknum, blkno, mode,
+                                                                         
prefetch_buffer);
+               if (BufferIsValid(*buf))
+                       return BLK_NEEDS_REDO;
+               return BLK_NOTFOUND;
+       }
+
+       Assert(!willinit);
+       if (blk->old_pblkno == InvalidBlockNumber)
+               elog(PANIC,
+                        "remap-without-image record has invalid old pblk for 
%u/%u/%u fork %d block %u",
+                        rlocator.spcOid, rlocator.dbOid, rlocator.relNumber,
+                        forknum, blkno);
+
+       UmMapSetMapping(remap_smgr, forknum, blkno, blk->old_pblkno, lsn);
        if (!XLogUmbraEnsureMappedBlockForRedo(rlocator, forknum, blkno))
                return BLK_NOTFOUND;
 
@@ -729,6 +898,17 @@ XLogReadBufferForRedoExtendedUmbra(XLogReaderState *record,
        if (!BufferIsValid(*buf))
                return BLK_NOTFOUND;
 
+       XLogUmbraLockRedoBuffer(*buf, mode, get_cleanup_lock);
+       MarkBufferDirty(*buf);
+       FlushOneBuffer(*buf);
+
+       UmMapSetMapping(remap_smgr, forknum, blkno, blk->new_pblkno, lsn);
+       if (blk->next_free_pblkno != InvalidBlockNumber)
+               
MapSBlockBumpNextFreePhysBlock(umfile_ctx_acquire(remap_smgr->smgr_rlocator),
+                                                                          
rlocator,
+                                                                          
forknum, blk->next_free_pblkno,
+                                                                          lsn);
+
        if (lsn <= PageGetLSN(BufferGetPage(*buf)))
                return BLK_DONE;
        return BLK_NEEDS_REDO;
diff --git a/src/backend/backup/basebackup.c b/src/backend/backup/basebackup.c
index 9c79dadaac..956039aa15 100644
--- a/src/backend/backup/basebackup.c
+++ b/src/backend/backup/basebackup.c
@@ -94,6 +94,7 @@ static int64 sendDir(bbsink *sink, const char *path, int 
basepathlen, bool sizeo
 static bool sendFile(bbsink *sink, const char *readfilename, const char 
*tarfilename,
                                         struct stat *statbuf, bool missing_ok,
                                         Oid dboid, Oid spcoid, RelFileNumber 
relfilenumber,
+                                        ForkNumber relForkNum,
                                         unsigned segno,
                                         backup_manifest_info *manifest,
                                         unsigned num_incremental_blocks,
@@ -364,7 +365,8 @@ perform_base_backup(basebackup_options *opt, bbsink *sink,
                                                                        
XLOG_CONTROL_FILE)));
                                sendFile(sink, XLOG_CONTROL_FILE, 
XLOG_CONTROL_FILE, &statbuf,
                                                 false, InvalidOid, InvalidOid,
-                                                InvalidRelFileNumber, 0, 
&manifest, 0, NULL, 0);
+                                                InvalidRelFileNumber, 
InvalidForkNumber, 0, &manifest,
+                                                0, NULL, 0);
                        }
                        else
                        {
@@ -630,7 +632,8 @@ perform_base_backup(basebackup_options *opt, bbsink *sink,
                                                 errmsg("could not stat file 
\"%s\": %m", pathbuf)));
 
                        sendFile(sink, pathbuf, pathbuf, &statbuf, false,
-                                        InvalidOid, InvalidOid, 
InvalidRelFileNumber, 0,
+                                        InvalidOid, InvalidOid, 
InvalidRelFileNumber,
+                                        InvalidForkNumber, 0,
                                         &manifest, 0, NULL, 0);
 
                        /* unconditionally mark file as archived */
@@ -1526,7 +1529,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, 
bool sizeonly,
                        if (!sizeonly)
                                sent = sendFile(sink, pathbuf, tarfilename, 
&statbuf,
                                                                true, dboid, 
spcoid,
-                                                               relfilenumber, 
segno, manifest,
+                                                               relfilenumber, 
relForkNum, segno, manifest,
                                                                
num_blocks_required,
                                                                method == 
BACK_UP_FILE_INCREMENTALLY ? relative_block_numbers : NULL,
                                                                
truncation_block_length);
@@ -1575,7 +1578,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, 
bool sizeonly,
 static bool
 sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
                 struct stat *statbuf, bool missing_ok, Oid dboid, Oid spcoid,
-                RelFileNumber relfilenumber, unsigned segno,
+                RelFileNumber relfilenumber, ForkNumber relForkNum, unsigned 
segno,
                 backup_manifest_info *manifest, unsigned 
num_incremental_blocks,
                 BlockNumber *incremental_blocks, unsigned 
truncation_block_length)
 {
@@ -1617,7 +1620,16 @@ sendFile(bbsink *sink, const char *readfilename, const 
char *tarfilename,
         * or disabled as that might change, thus we check at each point where 
we
         * could be validating a checksum.
         */
-       if (!noverify_checksums && RelFileNumberIsValid(relfilenumber))
+       if (!noverify_checksums && RelFileNumberIsValid(relfilenumber)
+#ifdef USE_UMBRA
+               /*
+                * Umbra mapped forks are copied in physical block order during 
base
+                * backup, but page checksums stay keyed by logical block 
number.
+                * INIT forks remain direct-mapped and can still be verified 
here.
+                */
+               && relForkNum == INIT_FORKNUM
+#endif
+               )
                verify_checksum = true;
 
        /*
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 551667650b..e5b2583b47 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -403,7 +403,7 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, 
ForkNumber forkNum)
                XLogRecPtr      recptr;
 
                XLogBeginInsert();
-               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT_BIRTH);
 
                xlrec.locator = rel->rd_locator;
 
@@ -832,7 +832,7 @@ nextval_internal(Oid relid, bool check_permissions)
                 * sequence values if we crash.
                 */
                XLogBeginInsert();
-               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT_BIRTH);
 
                /* set values that will be saved in xlog */
                seq->last_value = next;
@@ -1024,7 +1024,7 @@ SetSequence(Oid relid, int64 next, bool iscalled)
                Page            page = BufferGetPage(buf);
 
                XLogBeginInsert();
-               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
+               XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT_BIRTH);
 
                xlrec.locator = seqrel->rd_locator;
                XLogRegisterData(&xlrec, sizeof(xl_seq_rec));
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index eec09ba1de..d11cddb082 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -17422,7 +17422,8 @@ index_copy_data(Relation rel, RelFileLocator 
newrlocator)
        {
                if (smgrexists(RelationGetSmgr(rel), forkNum))
                {
-                       smgrcreate(dstrel, forkNum, false);
+                       if (!smgrisinternalfork(forkNum))
+                               smgrcreate(dstrel, forkNum, false);
 
                        /*
                         * WAL log creation if the relation is persistent, or 
this is the
@@ -17432,11 +17433,15 @@ index_copy_data(Relation rel, RelFileLocator 
newrlocator)
                                (rel->rd_rel->relpersistence == 
RELPERSISTENCE_UNLOGGED &&
                                 forkNum == INIT_FORKNUM))
                                log_smgrcreate(&newrlocator, forkNum);
-                       RelationCopyStorage(RelationGetSmgr(rel), dstrel, 
forkNum,
-                                                               
rel->rd_rel->relpersistence);
+                       if (!smgrisinternalfork(forkNum))
+                               RelationCopyStorage(RelationGetSmgr(rel), 
dstrel, forkNum,
+                                                                       
rel->rd_rel->relpersistence);
                }
        }
 
+       smgrcopyrelationmetadata(RelationGetSmgr(rel), dstrel,
+                                                        
rel->rd_rel->relpersistence);
+
        /* drop old relation, and close new one */
        RelationDropStorage(rel);
        smgrclose(dstrel);
diff --git a/src/backend/storage/map/map.c b/src/backend/storage/map/map.c
index bd839a3e9f..0dad150b2b 100644
--- a/src/backend/storage/map/map.c
+++ b/src/backend/storage/map/map.c
@@ -1382,6 +1382,17 @@ void MapGetNewPbkno(UmbraFileContext *map_ctx, 
RelFileLocator rnode, ForkNumber
        Assert(new_pblkno != NULL);
        Assert(old_pblkno != NULL);
 
+       /*
+        * During recovery, MAIN fork physical choices must come from WAL 
records.
+        * FSM/VM are hint forks and replay can touch them without dedicated 
remap
+        * metadata (e.g. free space updates), so we allow local allocation for
+        * them.
+        */
+       if (InRecovery && forknum == MAIN_FORKNUM)
+               elog(PANIC,
+                        "MapGetNewPbkno called during recovery for rel 
%u/%u/%u fork %d blk %u",
+                        rnode.spcOid, rnode.dbOid, rnode.relNumber, forknum, 
lblkno);
+
        for (;;)
        {
                if (!MapTryLookup(map_ctx, rnode, forknum, lblkno, &cur_pblkno))
diff --git a/src/backend/storage/map/mapsuper.c 
b/src/backend/storage/map/mapsuper.c
index 3d8909f7a4..07ac7b39c6 100644
--- a/src/backend/storage/map/mapsuper.c
+++ b/src/backend/storage/map/mapsuper.c
@@ -654,6 +654,18 @@ MapSBlockRead(UmbraFileContext *map_ctx, RelFileLocator 
rnode, MapSuperblock *su
                                entry->page_lsn = 
MapSuperblockGetLastUpdatedLSN(&disk_super);
                                entry->flags = MAPSUPER_FLAG_VALID;
                                MapSuperResetReservedNextFrees(entry);
+                               Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        MAIN_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
MAIN_FORKNUM));
+                               Assert(MapNormalizeForkBlockCount(FSM_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        FSM_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
FSM_FORKNUM));
+                               
Assert(MapNormalizeForkBlockCount(VISIBILITYMAP_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        VISIBILITYMAP_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
VISIBILITYMAP_FORKNUM));
                        }
                        else
                        {
@@ -661,6 +673,18 @@ MapSBlockRead(UmbraFileContext *map_ctx, RelFileLocator 
rnode, MapSuperblock *su
                                entry->page_lsn = InvalidXLogRecPtr;
                                entry->flags = MAPSUPER_FLAG_VALID | 
MAPSUPER_FLAG_CORRUPT;
                                MapSuperResetReservedNextFrees(entry);
+                               Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        MAIN_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
MAIN_FORKNUM));
+                               Assert(MapNormalizeForkBlockCount(FSM_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        FSM_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
FSM_FORKNUM));
+                               
Assert(MapNormalizeForkBlockCount(VISIBILITYMAP_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        VISIBILITYMAP_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
VISIBILITYMAP_FORKNUM));
                        }
                }
                else if (entry->flags & MAPSUPER_FLAG_CORRUPT)
@@ -684,6 +708,18 @@ MapSBlockRead(UmbraFileContext *map_ctx, RelFileLocator 
rnode, MapSuperblock *su
                                entry->page_lsn = 
MapSuperblockGetLastUpdatedLSN(&disk_super);
                                entry->flags = MAPSUPER_FLAG_VALID;
                                MapSuperResetReservedNextFrees(entry);
+                               Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        MAIN_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
MAIN_FORKNUM));
+                               Assert(MapNormalizeForkBlockCount(FSM_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        FSM_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
FSM_FORKNUM));
+                               
Assert(MapNormalizeForkBlockCount(VISIBILITYMAP_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        VISIBILITYMAP_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
VISIBILITYMAP_FORKNUM));
                        }
                        else
                        {
@@ -691,6 +727,18 @@ MapSBlockRead(UmbraFileContext *map_ctx, RelFileLocator 
rnode, MapSuperblock *su
                                entry->page_lsn = InvalidXLogRecPtr;
                                entry->flags = MAPSUPER_FLAG_VALID | 
MAPSUPER_FLAG_CORRUPT;
                                MapSuperResetReservedNextFrees(entry);
+                               Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        MAIN_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
MAIN_FORKNUM));
+                               Assert(MapNormalizeForkBlockCount(FSM_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        FSM_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
FSM_FORKNUM));
+                               
Assert(MapNormalizeForkBlockCount(VISIBILITYMAP_FORKNUM,
+                                                                               
                  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                                
        VISIBILITYMAP_FORKNUM)) <=
+                                          MapSuperGetReservedNextFree(entry, 
VISIBILITYMAP_FORKNUM));
                        }
                }
                else if (entry->flags & MAPSUPER_FLAG_CORRUPT)
@@ -705,6 +753,18 @@ MapSBlockRead(UmbraFileContext *map_ctx, RelFileLocator 
rnode, MapSuperblock *su
                 * should consume that runtime state directly. Disk identity/CRC
                 * validation belongs to the slow path that populates shared 
state.
                 */
+               Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
+                                                                               
  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                        
MAIN_FORKNUM)) <=
+                          MapSuperGetReservedNextFree(entry, MAIN_FORKNUM));
+               Assert(MapNormalizeForkBlockCount(FSM_FORKNUM,
+                                                                               
  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                        
FSM_FORKNUM)) <=
+                          MapSuperGetReservedNextFree(entry, FSM_FORKNUM));
+               Assert(MapNormalizeForkBlockCount(VISIBILITYMAP_FORKNUM,
+                                                                               
  MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                        
VISIBILITYMAP_FORKNUM)) <=
+                          MapSuperGetReservedNextFree(entry, 
VISIBILITYMAP_FORKNUM));
                *super = entry->super;
                status = (entry->flags & MAPSUPER_FLAG_CORRUPT) ?
                        MAP_SBLOCK_READ_CORRUPT : MAP_SBLOCK_READ_OK;
@@ -835,9 +895,6 @@ MapSuperSetExtendingTarget(MapSuperEntry *entry, ForkNumber 
forknum,
        }
 }
 
-
-
-
 static bool
 MapSuperPrepareEntryForUpdate(UmbraFileContext *map_ctx, RelFileLocator rnode,
                                                          XLogRecPtr map_lsn, 
const char *missing_errmsg,
@@ -955,6 +1012,10 @@ MapSBlockUpdateLogicalNblocks(UmbraFileContext *map_ctx, 
RelFileLocator rnode,
                entry->flags |= MAPSUPER_FLAG_DIRTY;
        }
 
+       Assert(MapNormalizeForkBlockCount(forknum,
+                                                                         
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                forknum)) <=
+                  MapSuperGetReservedNextFree(entry, forknum));
        LWLockRelease(&entry->lock);
 }
 
@@ -1197,6 +1258,19 @@ MapSBlockInit(UmbraFileContext *map_ctx, RelFileLocator 
rnode, XLogRecPtr map_ls
                map_lsn : GetXLogWriteRecPtr();
        MapSuperblockSetLastUpdatedLSN(&entry->super, entry->page_lsn);
        entry->flags = MAPSUPER_FLAG_VALID | MAPSUPER_FLAG_DIRTY;
+       MapSuperResetReservedNextFrees(entry);
+       Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
+                                                                         
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                MAIN_FORKNUM)) 
<=
+                  MapSuperGetReservedNextFree(entry, MAIN_FORKNUM));
+       Assert(MapNormalizeForkBlockCount(FSM_FORKNUM,
+                                                                         
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                FSM_FORKNUM)) <=
+                  MapSuperGetReservedNextFree(entry, FSM_FORKNUM));
+       Assert(MapNormalizeForkBlockCount(VISIBILITYMAP_FORKNUM,
+                                                                         
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                
VISIBILITYMAP_FORKNUM)) <=
+                  MapSuperGetReservedNextFree(entry, VISIBILITYMAP_FORKNUM));
 
        /*
         * Persist superblock immediately so later backends in bootstrap/initdb 
can
@@ -1246,16 +1320,30 @@ MapSBlockEnsureLoaded(UmbraFileContext *map_ctx, 
RelFileLocator rnode)
                                entry->super = disk_super;
                                entry->page_lsn = 
MapSuperblockGetLastUpdatedLSN(&disk_super);
                                entry->flags = MAPSUPER_FLAG_VALID;
+                               MapSuperResetReservedNextFrees(entry);
                        }
                        else
                        {
                                MapSuperblockInit(&entry->super, 0);
                                entry->page_lsn = InvalidXLogRecPtr;
                                entry->flags = MAPSUPER_FLAG_VALID | 
MAPSUPER_FLAG_CORRUPT;
+                               MapSuperResetReservedNextFrees(entry);
                        }
                }
        }
 
+       Assert(MapNormalizeForkBlockCount(MAIN_FORKNUM,
+                                                                         
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                MAIN_FORKNUM)) 
<=
+                  MapSuperGetReservedNextFree(entry, MAIN_FORKNUM));
+       Assert(MapNormalizeForkBlockCount(FSM_FORKNUM,
+                                                                         
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                FSM_FORKNUM)) <=
+                  MapSuperGetReservedNextFree(entry, FSM_FORKNUM));
+       Assert(MapNormalizeForkBlockCount(VISIBILITYMAP_FORKNUM,
+                                                                         
MapSuperblockGetNextFreePhysBlock(&entry->super,
+                                                                               
                                                                
VISIBILITYMAP_FORKNUM)) <=
+                  MapSuperGetReservedNextFree(entry, VISIBILITYMAP_FORKNUM));
        LWLockRelease(&entry->lock);
        return true;
 }
@@ -1378,8 +1466,6 @@ MapSBlockTryGetNextFreePhysBlock(UmbraFileContext 
*map_ctx, RelFileLocator rnode
        return true;
 }
 
-
-
 void
 MapSBlockBumpLogicalNblocks(UmbraFileContext *map_ctx, RelFileLocator rnode,
                                                        ForkNumber forknum, 
BlockNumber nblocks,
@@ -1505,6 +1591,9 @@ MapSuperTableShmemInit(void)
                entry->next_free =
                        (i == MapSuperCapacity - 1) ? MAPSUPER_FREENEXT_END : 
(i + 1);
                entry->in_use = false;
+               entry->reserved_next_free_main = 0;
+               entry->reserved_next_free_fsm = 0;
+               entry->reserved_next_free_vm = 0;
                entry->extending_target_main = InvalidBlockNumber;
                entry->extending_target_fsm = InvalidBlockNumber;
                entry->extending_target_vm = InvalidBlockNumber;
diff --git a/src/backend/storage/smgr/bulk_write.c 
b/src/backend/storage/smgr/bulk_write.c
index f3c24082a6..b1e8dfa9d8 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -250,6 +250,51 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
        if (npending > 1)
                qsort(pending_writes, npending, sizeof(PendingWrite), 
buffer_cmp);
 
+       /*
+        * For Umbra mapped forks, WAL for new pages cannot be the first place 
that
+        * introduces logical coverage for a whole bulk-written run.
+        *
+        * log_newpages() writes one WAL record for a batch of pages, but
+        * wal-owned firstborn is only guaranteed for pages that are already
+        * contiguous with the current logical EOF. If we WAL-log a whole 
pending
+        * run before the relation is physically/logically extended, later
+        * smgrextend() calls can see a stale logical EOF and mistakenly 
zeroextend
+        * over earlier data pages in the same batch.
+        *
+        * Extend first so later pages in the run see a stable logical frontier
+        * while the batch is being prepared.  Umbra may still leave holes in 
the
+        * physical frontier for WAL-owned tail pages; that is acceptable as 
long
+        * as page WAL publishes the final mapping before the real page images 
are
+        * written below.
+        *
+        * The important ownership rule is:
+        *   1. smgrextend() claims first-born ownership and reserves the pblk
+        *      backend-locally
+        *   2. the later page WAL record reuses that same in-flight claim and
+        *      commits the mapping
+        *
+        * Do not reorder this to "log_newpages() first, smgrextend() later".
+        * xloginsert's birth path is allowed to own commit only after the birth
+        * claim already exists; otherwise a future refactor can reintroduce
+        * duplicate pblk reservation or stale-EOF zeroextend bugs.
+        */
+#ifdef USE_UMBRA
+       if (npending > 0)
+       {
+               BlockNumber maxblk;
+
+               maxblk = pending_writes[npending - 1].blkno;
+               while (bulkstate->relsize <= maxblk)
+               {
+                       smgrextend(bulkstate->smgr, bulkstate->forknum,
+                                          bulkstate->relsize,
+                                          &zero_buffer,
+                                          true);
+                       bulkstate->relsize++;
+               }
+       }
+#endif
+
        if (bulkstate->use_wal)
        {
                BlockNumber blknos[MAX_PENDING_WRITES];
@@ -261,9 +306,9 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
                        blknos[i] = pending_writes[i].blkno;
                        pages[i] = pending_writes[i].buf->data;
 
-                       /*
-                        * If any of the pages use !page_std, we log them all 
as such.
-                        * That's a bit wasteful, but in practice, a mix of 
standard and
+                               /*
+                                * If any of the pages use !page_std, we log 
them all as such.
+                                * That's a bit wasteful, but in practice, a 
mix of standard and
                         * non-standard page layout is rare.  None of the 
built-in AMs do
                         * that.
                         */
@@ -279,7 +324,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
                BlockNumber blkno = pending_writes[i].blkno;
                Page            page = pending_writes[i].buf->data;
 
-               PageSetChecksum(page, blkno);
+                       PageSetChecksum(page, blkno);
 
                if (blkno >= bulkstate->relsize)
                {
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 1e3e0b08f8..8ba29edc56 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -694,11 +694,6 @@ smgrinvalidatedatabase(Oid dbid)
        smgrinvalidatedatabasetablespaces(dbid, 0, NULL);
 }
 
-
-
-
-
-
 void
 smgrmarkskipwalpending(RelFileLocator rlocator)
 {
diff --git a/src/backend/storage/smgr/umbra.c b/src/backend/storage/smgr/umbra.c
index 917dff0a64..f382d56c34 100644
--- a/src/backend/storage/smgr/umbra.c
+++ b/src/backend/storage/smgr/umbra.c
@@ -43,7 +43,6 @@
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/map.h"
-#include "storage/md.h"
 #include "storage/smgr.h"
 #include "storage/umbra.h"
 #include "storage/umfile.h"
@@ -221,6 +220,7 @@ static void um_reserve_fresh_pblkno_for_access(SMgrRelation 
reln,
                                                                                
           BlockNumber lblkno,
                                                                                
           BlockNumber *new_pblkno);
 static bool um_fork_uses_map_translation(ForkNumber forknum);
+static bool um_fork_uses_wal_owned_firstborn(ForkNumber forknum);
 static bool um_mapped_exists_from_super(SMgrRelation reln, ForkNumber forknum);
 static UmbraMapPolicy um_open_map_state(SMgrRelation reln);
 static bool um_state_uses_map(UmbraMapPolicy state);
@@ -393,6 +393,8 @@ UmApplyReservedRangeRemap(SMgrRelation reln, ForkNumber 
forknum,
 
        if (max_pblkno != InvalidBlockNumber)
        {
+               MapSBlockBumpNextFreePhysBlock(ctx, reln->smgr_rlocator.locator,
+                                                                          
forknum, max_pblkno + 1, lsn);
                MapSBlockBumpPhysicalNblocks(ctx, reln->smgr_rlocator.locator,
                                                                         
forknum, max_pblkno + 1, lsn);
                for (BlockNumber i = 0; i < nblocks; i++)
@@ -629,6 +631,13 @@ um_fork_uses_map_translation(ForkNumber forknum)
  * concentrated in a few helper paths, so we keep them on explicit mapping
  * publication rather than tying first-born ownership to arbitrary page WAL.
  */
+static bool
+um_fork_uses_wal_owned_firstborn(ForkNumber forknum)
+{
+       return UmbraForkUsesMapTranslation(forknum) &&
+               !UmbraForkIsAuxiliaryMapped(forknum);
+}
+
 static bool
 um_mapped_exists_from_super(SMgrRelation reln, ForkNumber forknum)
 {
@@ -1131,6 +1140,184 @@ um_resolve_mapped_read_run(SMgrRelation reln, 
ForkNumber forknum,
        return 0;
 }
 
+static void
+um_materialize_pblk_zero_runs(UmbraFileContext *ctx, ForkNumber forknum,
+                                                         const BlockNumber 
*pblknos, BlockNumber nblocks,
+                                                         bool skipFsync)
+{
+       BlockNumber     run_start_pblk = InvalidBlockNumber;
+       BlockNumber     run_blocks = 0;
+
+       Assert(ctx != NULL);
+       Assert(pblknos != NULL);
+
+       for (BlockNumber i = 0; i < nblocks; i++)
+       {
+               BlockNumber pblk = pblknos[i];
+
+               if (run_blocks == 0)
+               {
+                       run_start_pblk = pblk;
+                       run_blocks = 1;
+               }
+               else if (pblk == run_start_pblk + run_blocks)
+               {
+                       run_blocks++;
+               }
+               else
+               {
+                       umfile_zeroextend(ctx, forknum, run_start_pblk,
+                                                         (int) run_blocks, 
skipFsync);
+                       run_start_pblk = pblk;
+                       run_blocks = 1;
+               }
+       }
+
+       if (run_blocks > 0)
+               umfile_zeroextend(ctx, forknum, run_start_pblk,
+                                                 (int) run_blocks, skipFsync);
+}
+
+static bool
+um_pblk_run_is_contiguous(const BlockNumber *pblknos, BlockNumber nblocks)
+{
+       Assert(pblknos != NULL);
+       Assert(nblocks > 0);
+
+       for (BlockNumber i = 1; i < nblocks; i++)
+       {
+               if (pblknos[i] != pblknos[0] + i)
+                       return false;
+       }
+
+       return true;
+}
+
+static bool
+um_try_pure_firstborn_range_remap_zeroextend(SMgrRelation reln, ForkNumber 
forknum,
+                                                                               
         const UmbraAccessState *access,
+                                                                               
         BlockNumber blocknum,
+                                                                               
         BlockNumber nblocks,
+                                                                               
         bool skipFsync)
+{
+       UmbraFileContext *ctx = um_ctx_acquire(reln);
+       BlockNumber *pblknos;
+       xl_umbra_range_remap_entry *entries;
+       bool            wal_insert_enabled;
+       bool            applied = false;
+
+       Assert(access != NULL);
+       Assert(access->map_available);
+       Assert(nblocks > 0);
+
+       /*
+        * Try to collapse an EOF zeroextend range into one or more RANGE_REMAP
+        * records. RANGE_REMAP carries only new pblk ownership, so this helper 
is
+        * deliberately all-or-nothing and only accepts pure first-born ranges.
+        * Recovery consumes authoritative remap WAL instead of synthesizing new
+        * range ownership locally.
+        */
+       if (InRecovery || nblocks < 2)
+               return false;
+
+       pblknos = palloc(sizeof(BlockNumber) * nblocks);
+       entries = palloc(sizeof(xl_umbra_range_remap_entry) * nblocks);
+
+       for (BlockNumber i = 0; i < nblocks; i++)
+       {
+               BlockNumber lblk = blocknum + i;
+               BlockNumber pblk;
+
+               if (MapTryLookup(ctx, reln->smgr_rlocator.locator, forknum, 
lblk, &pblk) ||
+                       MapInflightLookupOwnedPblk(reln->smgr_rlocator.locator,
+                                                                          
forknum, lblk, &pblk))
+               {
+                       /*
+                        * Normal EOF extension should not get here.  Treat 
existing or
+                        * in-flight ownership as a compatibility fallback 
condition, not as
+                        * a mixed-range batching opportunity.
+                        */
+                       pfree(entries);
+                       pfree(pblknos);
+                       return false;
+               }
+       }
+
+       for (BlockNumber i = 0; i < nblocks; i++)
+       {
+               BlockNumber lblk = blocknum + i;
+
+               if (!MapReserveFreshPblkno(ctx, reln->smgr_rlocator.locator,
+                                                                  forknum, 
lblk, &pblknos[i]))
+               {
+                       for (BlockNumber j = 0; j < i; j++)
+                               MapInflightRelease(reln->smgr_rlocator.locator,
+                                                                  forknum, 
blocknum + j);
+                       pfree(entries);
+                       pfree(pblknos);
+                       return false;
+               }
+               entries[i].lblkno = lblk;
+               entries[i].new_pblkno = pblknos[i];
+       }
+
+       wal_insert_enabled =
+               XLogInsertAllowed() &&
+               !IsBootstrapProcessingMode() &&
+               !IsInitProcessingMode();
+
+       PG_TRY();
+       {
+               BlockNumber done = 0;
+
+               while (done < nblocks)
+               {
+                       BlockNumber chunk_blocks = Min(nblocks - done,
+                                                                               
   (BlockNumber) UINT16_MAX);
+                       XLogRecPtr      map_lsn = InvalidXLogRecPtr;
+
+                       if (wal_insert_enabled)
+                       {
+                               if (um_pblk_run_is_contiguous(pblknos + done, 
chunk_blocks))
+                                       map_lsn = log_umbra_range_remap_compact(
+                                               reln->smgr_rlocator.locator, 
forknum,
+                                               blocknum + done, pblknos[done], 
(uint16) chunk_blocks);
+                               else
+                                       map_lsn = log_umbra_range_remap(
+                                               reln->smgr_rlocator.locator, 
forknum,
+                                               (uint16) chunk_blocks, entries 
+ done);
+                       }
+
+                       um_materialize_pblk_zero_runs(ctx, forknum, pblknos + 
done,
+                                                                               
  chunk_blocks, skipFsync);
+                       UmApplyReservedRangeRemap(reln, forknum, blocknum + 
done,
+                                                                         
chunk_blocks, pblknos + done,
+                                                                         
map_lsn, skipFsync);
+                       done += chunk_blocks;
+               }
+
+               applied = true;
+       }
+       PG_CATCH();
+       {
+               if (!applied)
+               {
+                       for (BlockNumber i = 0; i < nblocks; i++)
+                               MapInflightRelease(reln->smgr_rlocator.locator,
+                                                                  forknum, 
blocknum + i);
+               }
+
+               pfree(entries);
+               pfree(pblknos);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+       pfree(entries);
+       pfree(pblknos);
+       return true;
+}
+
 static UmbraMappedBirthResult
 um_publish_mapped_birth(SMgrRelation reln, ForkNumber forknum,
                                                const UmbraAccessState *access,
@@ -1139,26 +1326,83 @@ um_publish_mapped_birth(SMgrRelation reln, ForkNumber 
forknum,
        UmbraFileContext *ctx = um_ctx_acquire(reln);
        UmbraMappedBirthResult result;
        BlockNumber old_pblkno;
+       XLogRecPtr      map_lsn;
+       bool            wal_insert_enabled;
+       bool            wal_owns_firstborn;
+       bool            emit_map_set;
 
        Assert(access->map_available);
-       (void) allow_wal_owned_firstborn;
 
        result.mapping_published = false;
 
+       if (InRecovery && um_fork_uses_wal_owned_firstborn(forknum))
+               elog(PANIC,
+                        "missing WAL mapping during recovery for relation 
%u/%u/%u fork %d blk %u",
+                        reln->smgr_rlocator.locator.spcOid,
+                        reln->smgr_rlocator.locator.dbOid,
+                        reln->smgr_rlocator.locator.relNumber,
+                        forknum, lblkno);
+
        MapGetNewPbkno(ctx, reln->smgr_rlocator.locator, forknum, lblkno,
                                   &result.pblkno, &old_pblkno);
        Assert(old_pblkno == InvalidBlockNumber);
 
-       MapSetMapping(ctx, reln->smgr_rlocator.locator, forknum, lblkno,
-                                 result.pblkno, InvalidXLogRecPtr);
-       result.mapping_published = true;
+       wal_owns_firstborn = false;
+
+       if (InRecovery)
+       {
+               map_lsn = GetXLogReplayRecPtr(NULL);
+               MapSetMapping(ctx, reln->smgr_rlocator.locator, forknum, lblkno,
+                                         result.pblkno, map_lsn);
+               result.mapping_published = true;
+       }
+       else
+       {
+               /*
+                * Birth ownership needs crash-recovery WAL even at 
wal_level=minimal.
+                * XLogIsNeeded() is too weak here because it suppresses WAL 
that is
+                * still required to recover eager MAP_SET publication after a 
crash.
+                */
+               wal_insert_enabled =
+                       XLogInsertAllowed() &&
+                       !IsBootstrapProcessingMode() &&
+                       !IsInitProcessingMode();
+
+               wal_owns_firstborn =
+                       allow_wal_owned_firstborn &&
+                       wal_insert_enabled &&
+                       UmWalOwnedFirstbornAvailable(reln, forknum, lblkno);
+
+               emit_map_set = !wal_owns_firstborn;
+
+               if (emit_map_set && wal_insert_enabled)
+                       map_lsn = 
log_umbra_map_set(reln->smgr_rlocator.locator, forknum,
+                                                                               
lblkno, old_pblkno, result.pblkno);
+               else
+                       map_lsn = InvalidXLogRecPtr;
+
+               if (emit_map_set)
+               {
+                       MapSetMapping(ctx, reln->smgr_rlocator.locator, 
forknum, lblkno,
+                                                 result.pblkno, map_lsn);
+                       result.mapping_published = true;
+               }
+       }
 
        if (result.mapping_published)
                MapSBlockBumpNextFreePhysBlock(ctx, reln->smgr_rlocator.locator,
                                                                           
forknum, result.pblkno + 1,
-                                                                          
InvalidXLogRecPtr);
+                                                                          
map_lsn);
 
-       MapInflightRelease(reln->smgr_rlocator.locator, forknum, lblkno);
+       /*
+        * WAL-owned first-born pages keep their in-flight claim private until 
WAL
+        * insertion succeeds and XLogCommitBlockRemapsUmbra() publishes the 
mapping.
+        * Advancing the physical frontier or releasing the claim here would
+        * let xloginsert reserve a second pblk for the same logical birth, 
leaving
+        * alternating holes in the initial physical layout.
+        */
+       if (!wal_owns_firstborn)
+               MapInflightRelease(reln->smgr_rlocator.locator, forknum, 
lblkno);
        return result;
 }
 
@@ -1348,6 +1592,30 @@ UmMapAccessAvailable(SMgrRelation reln, ForkNumber 
forknum)
        return access.map_available;
 }
 
+bool
+UmWalOwnedRemapAvailable(SMgrRelation reln, ForkNumber forknum)
+{
+       UmbraAccessState access;
+
+       access = um_classify_access(reln, forknum);
+       return access.policy == UMBRA_MAP_POLICY_REQUIRE_MAP;
+}
+
+bool
+UmWalOwnedFirstbornAvailable(SMgrRelation reln, ForkNumber forknum,
+                                                        BlockNumber lblkno)
+{
+       UmbraAccessState access;
+
+       (void) lblkno;
+       access = um_classify_access(reln, forknum);
+       return access.policy == UMBRA_MAP_POLICY_REQUIRE_MAP &&
+               XLogInsertAllowed() &&
+               !IsBootstrapProcessingMode() &&
+               !IsInitProcessingMode() &&
+               um_fork_uses_wal_owned_firstborn(forknum);
+}
+
 bool
 UmMapTryLookupPblkno(SMgrRelation reln, ForkNumber forknum,
                                         BlockNumber lblkno, BlockNumber 
*pblkno)
@@ -1598,6 +1866,12 @@ umzeroextend(SMgrRelation reln, ForkNumber forknum, 
BlockNumber blocknum,
                return;
        }
 
+       if (um_try_pure_firstborn_range_remap_zeroextend(reln, forknum, &access,
+                                                                               
                        blocknum,
+                                                                               
                        (BlockNumber) nblocks,
+                                                                               
                        skipFsync))
+               return;
+
        /*
         * Per-block path for single-block, recovery, or callers that 
encountered
         * pre-existing/pending MAP ownership in the requested range.
@@ -1821,15 +2095,9 @@ um_startreadv_mapped_physical(PgAioHandle *ioh, 
SMgrRelation reln,
 {
        if (aux_recovery_read)
        {
-               uint64          ensured_bytes = 0;
-
                if (!umfile_ctx_block_exists(ctx, forknum, pblk))
-               {
                        um_ensure_datafork_batch_ready_for_access(reln, 
forknum, access,
                                                                                
                          pblk, true /* skipFsync */ );
-                       ensured_bytes = BLCKSZ;
-               }
-               (void) ensured_bytes;
        }
 
        pgaio_io_set_target_smgr(ioh, reln, forknum,
@@ -1852,7 +2120,6 @@ umstartreadv(PgAioHandle *ioh, SMgrRelation reln, 
ForkNumber forknum,
        UmbraFileContext *ctx = um_ctx_acquire(reln);
        BlockNumber     pblk;
        bool            aux_recovery_read;
-
        access = um_classify_access(reln, forknum);
 
        if (!access.map_available)
@@ -1878,7 +2145,9 @@ umstartreadv(PgAioHandle *ioh, SMgrRelation reln, 
ForkNumber forknum,
                                                                                
                 &lookup_state,
                                                                                
                 aux_recovery_read, &pblk);
                if (run_blocks == 0)
+               {
                        return;
+               }
 
                if (run_blocks < nblocks)
                        ioh->handle_data_len = run_blocks;
@@ -2001,8 +2270,8 @@ umwritev(SMgrRelation reln, ForkNumber forknum, 
BlockNumber blocknum,
                        pending_barrier.entry_idx = -1;
 
                        /*
-                        * The barrier serializes physical writes with 
concurrent remap publication for
-                        * the same logical block.  Claim before lookup so a 
later relocation
+                        * The barrier serializes physical writes with any 
foreign in-flight
+                        * remap for the same logical block.  Claim before 
lookup so a later remap
                         * cannot publish a new mapping while this write is 
still targeting the
                         * old physical page.
                         */
diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c
index cccc4a24c8..8816e26f1f 100644
--- a/src/backend/utils/adt/dbsize.c
+++ b/src/backend/utils/adt/dbsize.c
@@ -22,6 +22,7 @@
 #include "commands/tablespace.h"
 #include "miscadmin.h"
 #include "storage/fd.h"
+#include "storage/smgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -366,6 +367,8 @@ pg_relation_size(PG_FUNCTION_ARGS)
        Oid                     relOid = PG_GETARG_OID(0);
        text       *forkName = PG_GETARG_TEXT_PP(1);
        Relation        rel;
+       SMgrRelation smgr;
+       ForkNumber      forknum;
        int64           size;
 
        rel = try_relation_open(relOid, AccessShareLock);
@@ -380,8 +383,15 @@ pg_relation_size(PG_FUNCTION_ARGS)
        if (rel == NULL)
                PG_RETURN_NULL();
 
-       size = calculate_relation_size(&(rel->rd_locator), rel->rd_backend,
-                                                                  
forkname_to_number(text_to_cstring(forkName)));
+       forknum = forkname_to_number(text_to_cstring(forkName));
+       smgr = RelationGetSmgr(rel);
+
+       /*
+        * Umbra may remap a relation's logical blocks onto a sparse physical 
file.
+        * SQL-visible relation size follows the storage manager's logical block
+        * count, not raw stat(2) bytes.
+        */
+       size = (int64) smgrnblocks(smgr, forknum) * BLCKSZ;
 
        relation_close(rel, AccessShareLock);
 
diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore
index ec51f41c76..8d694dc47a 100644
--- a/src/bin/pg_waldump/.gitignore
+++ b/src/bin/pg_waldump/.gitignore
@@ -21,6 +21,7 @@
 /spgdesc.c
 /standbydesc.c
 /tblspcdesc.c
+/umbradesc.c
 /xactdesc.c
 /xlogdesc.c
 
diff --git a/src/bin/pg_waldump/Makefile b/src/bin/pg_waldump/Makefile
index aabb87566a..f493f2b48d 100644
--- a/src/bin/pg_waldump/Makefile
+++ b/src/bin/pg_waldump/Makefile
@@ -24,6 +24,14 @@ override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
 LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils
 
 RMGRDESCSOURCES = $(sort $(notdir $(wildcard 
$(top_srcdir)/src/backend/access/rmgrdesc/*desc*.c)))
+
+# Umbra adds rmgrdesc/umbradesc.c, which should only be built when Umbra is
+# enabled.  pg_waldump uses a wildcard to compile all rmgrdesc sources, so we
+# must explicitly filter it out for md builds.
+ifneq ($(with_umbra), yes)
+RMGRDESCSOURCES := $(filter-out umbradesc.c,$(RMGRDESCSOURCES))
+endif
+
 RMGRDESCOBJS = $(patsubst %.c,%.o,$(RMGRDESCSOURCES))
 
 
@@ -52,6 +60,7 @@ uninstall:
 
 clean distclean:
        rm -f pg_waldump$(X) $(OBJS) $(RMGRDESCSOURCES) xlogreader.c xlogstats.c
+       rm -f umbradesc.c umbradesc.o umbradesc.bc
        rm -rf tmp_check
 
 check:
diff --git a/src/include/access/umbra_xlog.h b/src/include/access/umbra_xlog.h
index cb0c2bac57..6b2408d33c 100644
--- a/src/include/access/umbra_xlog.h
+++ b/src/include/access/umbra_xlog.h
@@ -5,6 +5,8 @@
  *
  * Umbra logs these record types:
  * - MAP_SET: establish/switch lblkno -> pblkno mapping
+ * - RANGE_REMAP: atomically establish a range of first-born mappings
+ * - RANGE_REMAP_COMPACT: same semantics for contiguous lblk/pblk runs
  * - SKIP_WAL_DENSE_MAP: record non-empty skip-WAL dense lblk==pblk frontiers
  *
  *-------------------------------------------------------------------------
@@ -19,6 +21,8 @@
 
 /* XLOG gives us high 4 bits */
 #define XLOG_UMBRA_MAP_SET                     0x10
+#define XLOG_UMBRA_RANGE_REMAP         0x30
+#define XLOG_UMBRA_RANGE_REMAP_COMPACT 0x50
 #define XLOG_UMBRA_SKIP_WAL_DENSE_MAP  0x60
 
 typedef struct xl_umbra_map_set
@@ -30,6 +34,32 @@ typedef struct xl_umbra_map_set
        BlockNumber new_pblkno;
 } xl_umbra_map_set;
 
+typedef struct xl_umbra_range_remap_entry
+{
+       BlockNumber     lblkno;
+       BlockNumber     new_pblkno;
+} xl_umbra_range_remap_entry;
+
+typedef struct xl_umbra_range_remap
+{
+       RelFileLocator rlocator;
+       ForkNumber      forknum;
+       uint16          count;
+       uint16          padding;
+       BlockNumber end_lblkno;
+       xl_umbra_range_remap_entry entries[FLEXIBLE_ARRAY_MEMBER];
+} xl_umbra_range_remap;
+
+typedef struct xl_umbra_range_remap_compact
+{
+       RelFileLocator rlocator;
+       ForkNumber      forknum;
+       uint16          count;
+       uint16          padding;
+       BlockNumber     first_lblkno;
+       BlockNumber     first_pblkno;
+} xl_umbra_range_remap_compact;
+
 typedef struct xl_umbra_skip_wal_dense_map_entry
 {
        ForkNumber      forknum;
@@ -47,6 +77,15 @@ typedef struct xl_umbra_skip_wal_dense_map
 extern XLogRecPtr log_umbra_map_set(RelFileLocator rlocator, ForkNumber 
forknum,
                                                                        
BlockNumber lblkno, BlockNumber old_pblkno,
                                                                        
BlockNumber new_pblkno);
+extern XLogRecPtr log_umbra_range_remap(RelFileLocator rlocator,
+                                                                               
ForkNumber forknum,
+                                                                               
uint16 count,
+                                                                               
const xl_umbra_range_remap_entry *entries);
+extern XLogRecPtr log_umbra_range_remap_compact(RelFileLocator rlocator,
+                                                                               
                ForkNumber forknum,
+                                                                               
                BlockNumber first_lblkno,
+                                                                               
                BlockNumber first_pblkno,
+                                                                               
                uint16 count);
 extern XLogRecPtr log_umbra_skip_wal_dense_map(RelFileLocator rlocator,
                                                                                
           uint16 count,
                                                                                
           const xl_umbra_skip_wal_dense_map_entry *entries);
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 97eae2c1da..a71fa05f71 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -131,6 +131,13 @@ typedef struct
 
        /* copy of the fork_flags field from the XLogRecordBlockHeader */
        uint8           flags;
+#ifdef USE_UMBRA
+       bool            has_remap;
+       BlockNumber old_pblkno;
+       BlockNumber new_pblkno;
+       BlockNumber logical_nblocks;
+       BlockNumber next_free_pblkno;
+#endif
 
        /* Information on full-page image, if any */
        bool            has_image;              /* has image, even for 
consistency checking */
@@ -424,6 +431,10 @@ extern bool DecodeXLogRecord(XLogReaderState *state,
        ((decoder)->record->blocks[block_id].has_image)
 #define XLogRecBlockImageApply(decoder, block_id)              \
        ((decoder)->record->blocks[block_id].apply_image)
+#ifdef USE_UMBRA
+#define XLogRecBlockHasRemap(decoder, block_id)                \
+       ((decoder)->record->blocks[block_id].has_remap)
+#endif
 #define XLogRecHasBlockData(decoder, block_id)         \
        ((decoder)->record->blocks[block_id].has_data)
 
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 80764f9a26..24b1916a11 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -130,6 +130,27 @@ typedef struct XLogRecordBlockHeader
 
 #define SizeOfXLogRecordBlockHeader (offsetof(XLogRecordBlockHeader, 
data_length) + sizeof(uint16))
 
+/*
+ * Extra header information for UMBRA remap metadata.
+ *
+ * When BKPBLOCK_HAS_REMAP is set, this header follows
+ * XLogRecordBlockHeader and stores the physical remap transition for the
+ * referenced logical block.
+ */
+typedef struct XLogRecordBlockRemapHeader
+{
+       BlockNumber     old_pblkno;
+       BlockNumber     new_pblkno;
+       BlockNumber logical_nblocks;
+       BlockNumber next_free_pblkno;
+} XLogRecordBlockRemapHeader;
+
+#ifdef USE_UMBRA
+#define SizeOfXLogRecordBlockRemapHeader sizeof(XLogRecordBlockRemapHeader)
+#else
+#define SizeOfXLogRecordBlockRemapHeader 0
+#endif
+
 /*
  * Additional header information when a full-page image is included
  * (i.e. when BKPBLOCK_HAS_IMAGE is set).
@@ -200,6 +221,7 @@ typedef struct XLogRecordBlockCompressHeader
  */
 #define MaxSizeOfXLogRecordBlockHeader \
        (SizeOfXLogRecordBlockHeader + \
+        SizeOfXLogRecordBlockRemapHeader + \
         SizeOfXLogRecordBlockImageHeader + \
         SizeOfXLogRecordBlockCompressHeader + \
         sizeof(RelFileLocator) + \
@@ -209,8 +231,19 @@ typedef struct XLogRecordBlockCompressHeader
  * The fork number fits in the lower 4 bits in the fork_flags field. The upper
  * bits are used for flags.
  */
+/*
+ * The fork number is stored in the low bits of fork_flags; the high bits are
+ * used for per-block flags.
+ */
+#ifdef USE_UMBRA
+#define BKPBLOCK_FORK_MASK     0x07
+#define BKPBLOCK_HAS_REMAP     0x08    /* has remap metadata in WAL header */
+#define BKPBLOCK_FLAG_MASK     0xF8
+#else
 #define BKPBLOCK_FORK_MASK     0x0F
+#define BKPBLOCK_HAS_REMAP     0x00
 #define BKPBLOCK_FLAG_MASK     0xF0
+#endif
 #define BKPBLOCK_HAS_IMAGE     0x10    /* block data is an 
XLogRecordBlockImage */
 #define BKPBLOCK_HAS_DATA      0x20
 #define BKPBLOCK_WILL_INIT     0x40    /* redo will re-init the page */
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index a527f446f2..55a2de4df7 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -64,11 +64,19 @@ tests += {
       't/053_umbra_map_superblock_watermark.pl',
       't/054_umbra_map_fork_policy.pl',
       't/056_umbra_truncate_superblock.pl',
+      't/057_umbra_remap_crash_consistency.pl',
+      't/058_umbra_2pc_remap_recovery.pl',
       't/061_umbra_fsm_vm_map_translation.pl',
       't/062_umbra_truncate_drop_crash_matrix.pl',
       't/063_umbra_mainfork_head_unlink_checkpoint.pl',
       't/066_umbra_truncate_redo.pl',
+      't/067_umbra_remap_redo.pl',
+      't/068_umbra_old_baseline_checkpoint_window.pl',
+      't/069_umbra_range_remap_zeroextend.pl',
+      't/070_umbra_hash_birth_block_remap.pl',
       't/071_umbra_skip_wal_dense_map.pl',
+      't/072_umbra_ordinary_slim_block_remap.pl',
+      't/074_umbra_torn_page_remap.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/057_umbra_remap_crash_consistency.pl 
b/src/test/recovery/t/057_umbra_remap_crash_consistency.pl
new file mode 100644
index 0000000000..557de9bb3b
--- /dev/null
+++ b/src/test/recovery/t/057_umbra_remap_crash_consistency.pl
@@ -0,0 +1,74 @@
+# Verify remap-heavy workload remains consistent after crash restart.
+#
+# This is UMBRA-specific and skipped in md mode.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init();
+$node->append_conf(
+       'postgresql.conf', qq{
+autovacuum = off
+full_page_writes = on
+});
+$node->start();
+
+$node->safe_psql('postgres',
+       q{CREATE TABLE umb_remap_t(id int PRIMARY KEY, payload text);});
+
+$node->safe_psql(
+       'postgres', q{
+CREATE INDEX umb_remap_payload_idx ON umb_remap_t ((left(payload, 16)));
+INSERT INTO umb_remap_t
+SELECT g, repeat('a', 320) FROM generate_series(1, 30000) g;
+CHECKPOINT;
+UPDATE umb_remap_t
+SET payload = md5(id::text) || repeat('u', 280)
+WHERE id % 3 = 0;
+DELETE FROM umb_remap_t WHERE id % 17 = 0;
+INSERT INTO umb_remap_t
+SELECT g, repeat('n', 320) FROM generate_series(30001, 32000) g;
+});
+
+my $before = $node->safe_psql(
+       'postgres', q{
+SELECT count(*) || ',' ||
+          sum(length(payload))::bigint || ',' ||
+          sum(id)::bigint
+FROM umb_remap_t;
+});
+
+$node->stop('immediate');
+$node->start();
+
+my $after = $node->safe_psql(
+       'postgres', q{
+SELECT count(*) || ',' ||
+          sum(length(payload))::bigint || ',' ||
+          sum(id)::bigint
+FROM umb_remap_t;
+});
+
+is($after, $before, 'aggregate state preserved across crash restart');
+
+my $idx_count = $node->safe_psql(
+       'postgres', q{
+SET enable_seqscan = off;
+SELECT count(*) FROM umb_remap_t WHERE id BETWEEN 100 AND 30000;
+});
+my $seq_count = $node->safe_psql(
+       'postgres', q{
+SET enable_indexscan = off;
+SET enable_bitmapscan = off;
+SELECT count(*) FROM umb_remap_t WHERE id BETWEEN 100 AND 30000;
+});
+is($idx_count, $seq_count, 'index path and seq path return same rowcount');
+
+done_testing();
diff --git a/src/test/recovery/t/058_umbra_2pc_remap_recovery.pl 
b/src/test/recovery/t/058_umbra_2pc_remap_recovery.pl
new file mode 100644
index 0000000000..d3e9945df6
--- /dev/null
+++ b/src/test/recovery/t/058_umbra_2pc_remap_recovery.pl
@@ -0,0 +1,90 @@
+# Verify 2PC + remap workload correctness across crash recovery.
+#
+# This is UMBRA-specific and skipped in md mode.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init();
+$node->append_conf(
+       'postgresql.conf', qq{
+autovacuum = off
+full_page_writes = on
+max_prepared_transactions = 10
+});
+$node->start();
+
+$node->safe_psql('postgres',
+       q{CREATE TABLE umb_2pc_t(id int PRIMARY KEY, payload text);});
+
+$node->safe_psql(
+       'postgres', q{
+CREATE INDEX umb_2pc_payload_idx ON umb_2pc_t ((left(payload, 16)));
+INSERT INTO umb_2pc_t
+SELECT g, repeat('b', 300) FROM generate_series(1, 15000) g;
+CHECKPOINT;
+});
+
+$node->safe_psql(
+       'postgres', q{
+BEGIN;
+UPDATE umb_2pc_t SET payload = 'gx1_' || id::text WHERE id % 5 = 0;
+DELETE FROM umb_2pc_t WHERE id % 97 = 0;
+INSERT INTO umb_2pc_t SELECT g, repeat('x', 300) FROM generate_series(20001, 
20500) g;
+PREPARE TRANSACTION 'umbra_gx1';
+});
+
+$node->safe_psql(
+       'postgres', q{
+BEGIN;
+UPDATE umb_2pc_t
+SET payload = 'gx2_' || id::text
+WHERE id % 5 = 1 AND id % 97 <> 0;
+INSERT INTO umb_2pc_t SELECT g, repeat('y', 300) FROM generate_series(21001, 
21200) g;
+PREPARE TRANSACTION 'umbra_gx2';
+});
+
+$node->stop('immediate');
+$node->start();
+
+is($node->safe_psql(
+               'postgres',
+               q{SELECT count(*) FROM pg_prepared_xacts WHERE gid IN 
('umbra_gx1','umbra_gx2');}),
+       '2',
+       'prepared transactions survive crash recovery');
+
+$node->safe_psql('postgres', q{COMMIT PREPARED 'umbra_gx1';});
+$node->safe_psql('postgres', q{ROLLBACK PREPARED 'umbra_gx2';});
+
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_2pc_t;}), '15346',
+       'row count matches expected after commit/rollback prepared');
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_2pc_t WHERE id 
BETWEEN 20001 AND 20500;}), '500',
+       'gx1 inserted rows are visible');
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_2pc_t WHERE id 
BETWEEN 21001 AND 21200;}), '0',
+       'gx2 inserted rows are absent');
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_2pc_t WHERE id % 5 
= 0 AND payload LIKE 'gx1_%';}), '2970',
+       'gx1 updates are visible with expected count');
+is($node->safe_psql('postgres', q{SELECT count(*) FROM umb_2pc_t WHERE payload 
LIKE 'gx2_%';}), '0',
+       'gx2 updates are absent after rollback prepared');
+
+my $idx_count = $node->safe_psql(
+       'postgres', q{
+SET enable_seqscan = off;
+SELECT count(*) FROM umb_2pc_t WHERE id BETWEEN 100 AND 14900;
+});
+my $seq_count = $node->safe_psql(
+       'postgres', q{
+SET enable_indexscan = off;
+SET enable_bitmapscan = off;
+SELECT count(*) FROM umb_2pc_t WHERE id BETWEEN 100 AND 14900;
+});
+is($idx_count, $seq_count, 'index path and seq path match after 2PC recovery');
+
+done_testing();
diff --git a/src/test/recovery/t/067_umbra_remap_redo.pl 
b/src/test/recovery/t/067_umbra_remap_redo.pl
new file mode 100644
index 0000000000..c554ddc239
--- /dev/null
+++ b/src/test/recovery/t/067_umbra_remap_redo.pl
@@ -0,0 +1,90 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('umbra_remap');
+
+$node->init();
+$node->append_conf(
+       'postgresql.conf', qq[
+wal_level = 'replica'
+autovacuum = off
+]);
+$node->start();
+
+$node->safe_psql(
+       'postgres', q[
+CREATE TABLE umbra_hash(k int, filler text);
+INSERT INTO umbra_hash
+SELECT g % 97, repeat(md5(g::text), 2)
+FROM generate_series(1, 4000) AS g;
+CREATE INDEX umbra_hash_idx ON umbra_hash USING hash (k);
+
+CREATE TABLE umbra_brin(i int, filler text);
+INSERT INTO umbra_brin
+SELECT g, repeat('x', 200)
+FROM generate_series(1, 12000) AS g;
+CREATE INDEX umbra_brin_idx
+ON umbra_brin USING brin (i) WITH (pages_per_range = 1);
+ANALYZE umbra_hash;
+ANALYZE umbra_brin;
+]);
+
+$node->stop('immediate');
+ok($node->start(), 'restart after hash/brin index build crash');
+
+my $hash_plan = $node->safe_psql(
+       'postgres', q[
+SET enable_seqscan = off;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM umbra_hash WHERE k = 42;
+]);
+like($hash_plan, qr/umbra_hash_idx/, 'hash index plan survived recovery');
+
+is($node->safe_psql('postgres',
+               q[SELECT count(*) FROM umbra_hash WHERE k = 42]),
+       '41', 'hash index-backed equality query returns expected rows');
+
+my $brin_plan = $node->safe_psql(
+       'postgres', q[
+SET enable_seqscan = off;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM umbra_brin WHERE i BETWEEN 2500 AND 2600;
+]);
+like($brin_plan, qr/umbra_brin_idx/, 'brin index plan survived recovery');
+
+is($node->safe_psql('postgres',
+               q[SELECT count(*) FROM umbra_brin WHERE i BETWEEN 2500 AND 
2600]),
+       '101', 'brin range query returns expected rows after recovery');
+
+$node->safe_psql(
+       'postgres', q[
+INSERT INTO umbra_hash
+SELECT 42, repeat('y', 64)
+FROM generate_series(1, 9);
+INSERT INTO umbra_brin
+SELECT g, repeat('z', 200)
+FROM generate_series(12001, 12200) AS g;
+CHECKPOINT;
+]);
+
+$node->stop('immediate');
+ok($node->start(), 'restart after post-recovery indexed writes');
+
+is($node->safe_psql('postgres',
+               q[SELECT count(*) FROM umbra_hash WHERE k = 42]),
+       '50', 'hash index remains usable after second restart');
+
+is($node->safe_psql('postgres',
+               q[SELECT count(*) FROM umbra_brin WHERE i BETWEEN 12100 AND 
12150]),
+       '51', 'brin index remains usable after second restart');
+
+done_testing();
diff --git a/src/test/recovery/t/068_umbra_old_baseline_checkpoint_window.pl 
b/src/test/recovery/t/068_umbra_old_baseline_checkpoint_window.pl
new file mode 100644
index 0000000000..0ed178885e
--- /dev/null
+++ b/src/test/recovery/t/068_umbra_old_baseline_checkpoint_window.pl
@@ -0,0 +1,85 @@
+#
+# Verify that a post-checkpoint remap keeps the old physical page alive long
+# enough for crash recovery before the next checkpoint boundary.
+#
+# The contract under test is:
+# - establish a checkpoint
+# - modify existing logical pages afterwards, so redo must rely on the old
+#   physical page as baseline instead of a new checkpoint image
+# - crash before any later checkpoint
+# - restart must still recover the updated relation correctly
+#
+# This is UMBRA-specific and skipped in md mode.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init();
+$node->append_conf(
+       'postgresql.conf', qq{
+autovacuum = off
+full_page_writes = on
+checkpoint_timeout = '30min'
+max_wal_size = '4GB'
+});
+$node->start();
+
+$node->safe_psql('postgres',
+       q{CREATE TABLE umb_old_baseline_t(id int PRIMARY KEY, payload text);});
+
+$node->safe_psql(
+       'postgres', q{
+INSERT INTO umb_old_baseline_t
+SELECT g, repeat('a', 700) FROM generate_series(1, 4000) g;
+CHECKPOINT;
+UPDATE umb_old_baseline_t
+SET payload = md5(id::text) || repeat('u', 668)
+WHERE id % 2 = 0;
+});
+
+my $before = $node->safe_psql(
+       'postgres', q{
+SELECT count(*) || ',' ||
+          sum(length(payload))::bigint || ',' ||
+          sum((left(payload, 8) = md5(id::text)::text)::int)::bigint
+FROM umb_old_baseline_t;
+});
+
+$node->stop('immediate');
+$node->start();
+
+my $after = $node->safe_psql(
+       'postgres', q{
+SELECT count(*) || ',' ||
+          sum(length(payload))::bigint || ',' ||
+          sum((left(payload, 8) = md5(id::text)::text)::int)::bigint
+FROM umb_old_baseline_t;
+});
+
+is($after, $before,
+       'post-checkpoint remap survives crash before next checkpoint');
+
+is($node->safe_psql(
+               'postgres',
+               q{SELECT count(*) FROM umb_old_baseline_t
+                  WHERE id % 2 = 0
+                        AND left(payload, 8) = left(md5(id::text), 8);}),
+       '2000',
+       'even rows were recovered from remap baseline');
+
+is($node->safe_psql(
+               'postgres',
+               q{SELECT count(*) FROM umb_old_baseline_t
+                  WHERE id % 2 = 1
+                        AND payload = repeat('a', 700);}),
+       '2000',
+       'odd rows kept original payload');
+
+done_testing();
diff --git a/src/test/recovery/t/069_umbra_range_remap_zeroextend.pl 
b/src/test/recovery/t/069_umbra_range_remap_zeroextend.pl
new file mode 100644
index 0000000000..6c816ab0be
--- /dev/null
+++ b/src/test/recovery/t/069_umbra_range_remap_zeroextend.pl
@@ -0,0 +1,101 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('umbra_range_remap_zeroextend');
+my $input = $node->basedir . '/copy_input.csv';
+
+$node->init(has_archiving => 1);
+$node->append_conf(
+       'postgresql.conf', qq[
+wal_level = 'replica'
+autovacuum = off
+shared_buffers = '256MB'
+max_wal_size = '4GB'
+min_wal_size = '1GB'
+checkpoint_timeout = '1h'
+]);
+$node->start();
+
+open(my $fh, '>', $input) or die "could not create $input: $!";
+my $pad = 'x' x 200;
+for my $i (1 .. 200_000)
+{
+       print {$fh} "$i,$pad\n";
+}
+close($fh);
+
+$node->safe_psql('postgres', q[
+CREATE TABLE umbra_range_probe (id bigint, pad text);
+SELECT pg_switch_wal();
+]);
+
+my $start_lsn =
+  $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+$node->safe_psql('postgres',
+       qq[COPY umbra_range_probe FROM '$input' WITH (FORMAT csv);]);
+
+my $end_lsn =
+  $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+$node->safe_psql('postgres', q[
+SELECT pg_switch_wal();
+CHECKPOINT;
+]);
+$node->stop();
+
+my ($dump_stdout, $dump_stderr) = run_command(
+       [
+               'pg_waldump', '-p', $node->archive_dir,
+               '--start',   $start_lsn,
+               '--end',     $end_lsn
+       ]);
+is($dump_stderr, '', 'pg_waldump raw dump completed without stderr');
+
+my ($stats_stdout, $stats_stderr) = run_command(
+       [
+               'pg_waldump', '-p', $node->archive_dir,
+               '--stats=record',
+               '--start', $start_lsn,
+               '--end',   $end_lsn
+       ]);
+is($stats_stderr, '', 'pg_waldump stats completed without stderr');
+ok($stats_stdout =~ /Umbra\/RANGE_REMAP(?:_COMPACT)?\s+\d+/,
+   'WAL stats report Umbra range remap records');
+
+my @main_range_lines =
+  grep { /desc: RANGE_REMAP(?:_COMPACT)?/ && $_ !~ /_(?:fsm|vm)\b/ }
+  split /\n/, $dump_stdout;
+ok(@main_range_lines > 0,
+   'raw WAL dump contains main-fork RANGE_REMAP records');
+
+my $main_range_records = 0;
+my $main_range_pages = 0;
+my $max_main_range = 0;
+for my $line (@main_range_lines)
+{
+       if ($line =~ /count (\d+)/)
+       {
+               my $count = $1;
+
+               $main_range_records++;
+               $main_range_pages += $count;
+               $max_main_range = $count if $count > $max_main_range;
+       }
+}
+
+cmp_ok($max_main_range, '>', 1,
+          'main-fork RANGE_REMAP batches more than one page');
+cmp_ok($main_range_pages - $main_range_records, '>', 0,
+          'main-fork RANGE_REMAP collapses multiple first-born pages into 
fewer WAL records');
+
+done_testing();
diff --git a/src/test/recovery/t/070_umbra_hash_birth_block_remap.pl 
b/src/test/recovery/t/070_umbra_hash_birth_block_remap.pl
new file mode 100644
index 0000000000..ede9a1ff44
--- /dev/null
+++ b/src/test/recovery/t/070_umbra_hash_birth_block_remap.pl
@@ -0,0 +1,66 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('umbra_hash_birth_block_remap');
+
+$node->init(has_archiving => 1);
+$node->append_conf(
+       'postgresql.conf', qq[
+wal_level = 'replica'
+autovacuum = off
+shared_buffers = '256MB'
+max_wal_size = '4GB'
+min_wal_size = '1GB'
+checkpoint_timeout = '1h'
+]);
+$node->start();
+
+$node->safe_psql('postgres', q[
+CREATE TABLE hash_birth_probe (id bigint);
+CREATE INDEX hash_birth_probe_idx ON hash_birth_probe USING hash (id);
+SELECT pg_switch_wal();
+]);
+
+my $start_lsn =
+  $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+$node->safe_psql('postgres', q[
+INSERT INTO hash_birth_probe
+SELECT g
+FROM generate_series(1, 600000) AS g;
+]);
+
+my $end_lsn =
+  $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+$node->safe_psql('postgres', q[
+SELECT pg_switch_wal();
+CHECKPOINT;
+]);
+$node->stop();
+
+my ($dump_stdout, $dump_stderr) = run_command(
+       [
+               'pg_waldump', '-b', '-p', $node->archive_dir,
+               '--start', $start_lsn,
+               '--end',   $end_lsn
+       ]);
+is($dump_stderr, '', 'pg_waldump block dump completed without stderr');
+
+my @remap_header_lines =
+  grep { /; remap: old_pblk \d+ new_pblk \d+ logical_nblocks \d+ 
next_free_pblk \d+/ }
+  split /\n/, $dump_stdout;
+
+ok(@remap_header_lines > 0,
+   'raw WAL dump contains full remap block headers for hash index pages');
+
+done_testing();
diff --git a/src/test/recovery/t/072_umbra_ordinary_slim_block_remap.pl 
b/src/test/recovery/t/072_umbra_ordinary_slim_block_remap.pl
new file mode 100644
index 0000000000..0fe986abfa
--- /dev/null
+++ b/src/test/recovery/t/072_umbra_ordinary_slim_block_remap.pl
@@ -0,0 +1,69 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+plan skip_all => 'requires --with-umbra MAP fork'
+       unless check_pg_config('^#define USE_UMBRA 1$');
+
+my $node = PostgreSQL::Test::Cluster->new('umbra_ordinary_slim_block_remap');
+
+$node->init(has_archiving => 1);
+$node->append_conf(
+       'postgresql.conf', qq[
+wal_level = 'replica'
+autovacuum = off
+shared_buffers = '256MB'
+max_wal_size = '4GB'
+min_wal_size = '1GB'
+checkpoint_timeout = '1h'
+]);
+$node->start();
+
+$node->safe_psql('postgres', q[
+CREATE TABLE ordinary_slim_probe (id bigint, payload text) WITH (fillfactor = 
70);
+INSERT INTO ordinary_slim_probe
+SELECT g, repeat('x', 80)
+FROM generate_series(1, 200000) AS g;
+CHECKPOINT;
+SELECT pg_switch_wal();
+]);
+
+my $start_lsn =
+  $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+$node->safe_psql('postgres', q[
+UPDATE ordinary_slim_probe
+SET payload = repeat('y', 80)
+WHERE id <= 100000;
+]);
+
+my $end_lsn =
+  $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+$node->safe_psql('postgres', q[
+SELECT pg_switch_wal();
+CHECKPOINT;
+]);
+$node->stop();
+
+my ($dump_stdout, $dump_stderr) = run_command(
+       [
+               'pg_waldump', '-b', '-p', $node->archive_dir,
+               '--start', $start_lsn,
+               '--end',   $end_lsn
+       ]);
+is($dump_stderr, '', 'pg_waldump block dump completed without stderr');
+
+my @remap_header_lines =
+  grep { /; remap: old_pblk \d+ new_pblk \d+ logical_nblocks \d+ 
next_free_pblk \d+/ }
+  split /\n/, $dump_stdout;
+
+ok(@remap_header_lines > 0,
+   'raw WAL dump contains full remap block headers for updated heap pages');
+
+done_testing();
diff --git a/src/test/recovery/t/074_umbra_torn_page_remap.pl 
b/src/test/recovery/t/074_umbra_torn_page_remap.pl
new file mode 100644
index 0000000000..2c427757ed
--- /dev/null
+++ b/src/test/recovery/t/074_umbra_torn_page_remap.pl
@@ -0,0 +1,261 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+# Verify that Umbra crash recovery can recover a remapped heap page even when
+# the newly allocated physical block contains a torn write image.  In md mode,
+# run the same workload with full_page_writes=off as a negative control: the
+# manually torn heap page must not be recoverable as correct data.
+#
+# The test:
+# - checkpoints a relation, then updates existing heap pages
+# - in Umbra mode, extracts one new physical block number from the remap WAL
+# - in md mode, extracts one updated heap block as the negative control target
+# - kills the server, overwrites half of that physical block, and restarts
+# - verifies Umbra restores the logical relation contents while md/FPW-off
+#   cannot recover the torn page as correct data
+use strict;
+use warnings FATAL => 'all';
+
+use Fcntl qw(O_CREAT O_RDWR SEEK_SET);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $use_umbra = check_pg_config('^#define USE_UMBRA 1$');
+
+sub overwrite_half_physical_block
+{
+       my ($node, $relpath, $block_size, $pblkno) = @_;
+
+       my $seg_blocks = int((1024 * 1024 * 1024) / $block_size);
+       my $segno = int($pblkno / $seg_blocks);
+       my $segblk = $pblkno % $seg_blocks;
+       my $path = $node->data_dir . '/' . $relpath . ($segno == 0 ? '' : 
".$segno");
+       my $offset = $segblk * $block_size;
+       my $zeros = "\0" x int($block_size / 2);
+
+       sysopen(my $fh, $path, O_RDWR | O_CREAT, 0600)
+         or die "could not open $path: $!";
+       binmode($fh);
+       defined(sysseek($fh, $offset, SEEK_SET))
+         or die "could not seek to $offset in $path: $!";
+       my $written = syswrite($fh, $zeros);
+       die "could not overwrite torn half-page in $path: $!"
+         unless defined($written) && $written == length($zeros);
+       close($fh) or die "could not close $path: $!";
+
+       return ($path, $offset);
+}
+
+sub setup_node
+{
+       my ($name, $fpw) = @_;
+       my $node = PostgreSQL::Test::Cluster->new($name);
+
+       $node->init();
+       $node->append_conf(
+               'postgresql.conf', qq[
+autovacuum = off
+full_page_writes = $fpw
+shared_buffers = '256MB'
+max_wal_size = '4GB'
+min_wal_size = '1GB'
+checkpoint_timeout = '1h'
+]);
+       $node->start();
+       return $node;
+}
+
+sub prepare_and_update_table
+{
+       my ($node) = @_;
+
+       $node->safe_psql('postgres', q[
+CREATE TABLE umb_torn_page_t(id bigint, payload text)
+  WITH (fillfactor = 70);
+INSERT INTO umb_torn_page_t
+SELECT g, repeat('x', 80)
+FROM generate_series(1, 200000) AS g;
+CHECKPOINT;
+]);
+
+       my $relinfo = $node->safe_psql('postgres', q[
+SELECT (CASE WHEN c.reltablespace = 0
+             THEN d.dattablespace
+             ELSE c.reltablespace
+        END)::text || '/' ||
+       d.oid::text || '/' ||
+       pg_relation_filenode(c.oid)::text || '|' ||
+       pg_relation_filepath(c.oid) || '|' ||
+       current_setting('block_size')
+FROM pg_class c
+JOIN pg_database d ON d.datname = current_database()
+WHERE c.oid = 'umb_torn_page_t'::regclass;
+]);
+       my ($locator, $relpath, $block_size) = split /\|/, $relinfo;
+
+       my $start_lsn =
+         $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+       $node->safe_psql('postgres', q[
+UPDATE umb_torn_page_t
+SET payload = md5(id::text) || repeat('u', 48)
+WHERE id <= 100000;
+]);
+
+       my $before = $node->safe_psql('postgres', relation_signature_sql());
+
+       my $end_lsn =
+         $node->safe_psql('postgres', q[SELECT pg_current_wal_lsn();]);
+
+       my ($dump_stdout, $dump_stderr) = run_command(
+               [
+                       'pg_waldump', '-b', '-p', $node->data_dir . '/pg_wal',
+                       '--start', $start_lsn,
+                       '--end',   $end_lsn
+               ]);
+       $dump_stderr =~
+         s/^pg_waldump: first record is after [^\n]+, at [^\n]+, skipping over 
\d+ bytes\n?//m;
+       is($dump_stderr, '',
+               'pg_waldump block dump completed without unexpected stderr');
+
+       return ($locator, $relpath, $block_size, $before, $dump_stdout);
+}
+
+sub relation_signature_sql
+{
+       return q[
+SELECT count(*) || ',' ||
+       md5(string_agg(md5(id::text || ':' || payload), '' ORDER BY id))
+FROM umb_torn_page_t;
+];
+}
+
+sub find_umbra_remap
+{
+       my ($locator, $dump_stdout) = @_;
+
+       foreach my $blkref (split /(?=blkref #\d+:)/, $dump_stdout)
+       {
+               next
+                 unless $blkref =~ /blkref #\d+: rel \Q$locator\E fork main 
blk (\d+)/;
+               my $lblk = $1;
+               next
+                 unless $blkref =~
+                 /; remap: old_pblk (\d+) new_pblk (\d+) logical_nblocks \d+ 
next_free_pblk \d+/;
+
+               my ($old, $new) = ($1, $2);
+               next if $old == 4294967295;
+               next if $old == $new;
+
+               return ($lblk, $old, $new);
+       }
+
+       return;
+}
+
+sub find_md_heap_block
+{
+       my ($locator, $dump_stdout) = @_;
+
+       foreach my $blkref (split /(?=blkref #\d+:)/, $dump_stdout)
+       {
+               next
+                 unless $blkref =~ /blkref #\d+: rel \Q$locator\E fork main 
blk (\d+)/;
+               my $lblk = $1;
+               next if $lblk == 0;
+               return $lblk;
+       }
+
+       return;
+}
+
+sub verify_table_contents
+{
+       my ($node, $before) = @_;
+
+       my $after = $node->safe_psql('postgres', relation_signature_sql());
+
+       is($after, $before,
+               'recovery restores relation contents after torn new physical 
block');
+
+       is($node->safe_psql(
+                       'postgres',
+                       q[SELECT count(*) FROM umb_torn_page_t
+                          WHERE id <= 100000
+                            AND left(payload, 8) = left(md5(id::text), 8);]),
+               '100000',
+               'updated rows are visible after recovery');
+
+       is($node->safe_psql(
+                       'postgres',
+                       q[SELECT count(*) FROM umb_torn_page_t
+                          WHERE id > 100000
+                            AND payload = repeat('x', 80);]),
+               '100000',
+               'unmodified rows remain visible after recovery');
+}
+
+if ($use_umbra)
+{
+       my $node = setup_node('umbra_torn_page_remap', 'on');
+       my ($locator, $relpath, $block_size, $before, $dump_stdout) =
+         prepare_and_update_table($node);
+
+       my ($target_lblk, $old_pblk, $new_pblk) =
+         find_umbra_remap($locator, $dump_stdout);
+
+       ok(defined($new_pblk),
+               'update WAL contains a heap remap header with a new physical 
block');
+       BAIL_OUT('could not locate a concrete Umbra remap block for test 
relation')
+               unless defined($new_pblk);
+       cmp_ok($new_pblk, '!=', $old_pblk,
+               'selected WAL remap moves the heap page to a different physical 
block');
+
+       $node->stop('immediate');
+
+       my ($corrupt_path, $corrupt_offset) =
+         overwrite_half_physical_block($node, $relpath, $block_size, 
$new_pblk);
+       ok(-e $corrupt_path,
+               'new physical block segment exists after torn-write injection');
+       ok($corrupt_offset >= 0,
+               'torn-write injection targeted a concrete physical offset');
+
+       $node->start();
+       verify_table_contents($node, $before);
+}
+else
+{
+       my $node = setup_node('md_torn_page_fpw_off', 'off');
+       my ($locator, $relpath, $block_size, $before, $dump_stdout) =
+         prepare_and_update_table($node);
+
+       my $target_lblk = find_md_heap_block($locator, $dump_stdout);
+       ok(defined($target_lblk),
+               'update WAL contains a heap block reference for md negative 
control');
+       BAIL_OUT('could not locate a concrete md heap block for test relation')
+               unless defined($target_lblk);
+
+       $node->stop('immediate');
+
+       my ($corrupt_path, $corrupt_offset) =
+         overwrite_half_physical_block($node, $relpath, $block_size, 
$target_lblk);
+       ok(-e $corrupt_path,
+               'md heap segment exists after torn-write injection');
+       ok($corrupt_offset >= 0,
+               'md torn-write injection targeted a concrete physical offset');
+
+       my $started = $node->start(fail_ok => 1);
+       if (!$started)
+       {
+               pass('md with full_page_writes=off cannot restart from the torn 
page');
+       }
+       else
+       {
+               my ($ret, $stdout, $stderr) =
+                 $node->psql('postgres', relation_signature_sql());
+               ok($ret != 0 || $stdout ne $before,
+                       'md with full_page_writes=off does not recover correct 
data from the torn page');
+       }
+}
+
+done_testing();
-- 
2.50.1 (Apple Git-155)



Reply via email to