On Mon, Jun 17, 2024 at 04:58:54PM -0700, Noah Misch wrote:
> attached v2 patch stack.

Rebased.  This applies on top of three patches from
https://postgr.es/m/20240629024251.03.nmi...@google.com.  I'm attaching those
to placate cfbot, but this thread is for review of the last patch only.
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Warn if LOCKTAG_TUPLE is held at commit, under debug_assertions.
    
    The current use always releases this locktag.  A planned use will
    continue that intent.  It will involve more areas of code, making unlock
    omissions easier.  Warn under debug_assertions, like we do for various
    resource leaks.  Back-patch to v12 (all supported versions), the plan
    for the commit of the new use.
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/20240512232923.aa.nmi...@google.com

diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 0400a50..461d925 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -2256,6 +2256,11 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
                                locallock->numLockOwners = 0;
                }
 
+#ifdef USE_ASSERT_CHECKING
+               if (LOCALLOCK_LOCKTAG(*locallock) == LOCKTAG_TUPLE && !allLocks)
+                       elog(WARNING, "tuple lock held at commit");
+#endif
+
                /*
                 * If the lock or proclock pointers are NULL, this lock was 
taken via
                 * the relation fast-path (and is not known to have been 
transferred).
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Fix data loss at inplace update after heap_update().
    
    As previously-added tests demonstrated, heap_inplace_update() could
    instead update an unrelated tuple of the same catalog.  It could lose
    the update.  Losing relhasindex=t was a source of index corruption.
    Inplace-updating commands like VACUUM will now wait for heap_update()
    commands like GRANT TABLE and GRANT DATABASE.  That isn't ideal, but a
    long-running GRANT already hurts VACUUM progress more just by keeping an
    XID running.  The VACUUM will behave like a DELETE or UPDATE waiting for
    the uncommitted change.
    
    For implementation details, start at the heap_inplace_update_scan()
    header comment and README.tuplock.  Back-patch to v12 (all supported
    versions).  In back branches, retain a deprecated heap_inplace_update(),
    for extensions.
    
    Reviewed by FIXME.
    
    Discussion: 
https://postgr.es/m/CAMp+ueZQz3yDk7qg42hk6-9gxniYbp-=bg2mgqecerqr5gg...@mail.gmail.com

diff --git a/src/backend/access/heap/README.tuplock 
b/src/backend/access/heap/README.tuplock
index 6441e8b..dbfa2b7 100644
--- a/src/backend/access/heap/README.tuplock
+++ b/src/backend/access/heap/README.tuplock
@@ -153,3 +153,56 @@ The following infomask bits are applicable:
 
 We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
 is set.
+
+Locking to write inplace-updated tables
+---------------------------------------
+
+[This is the plan, but LOCKTAG_TUPLE acquisition is not yet here.]
+
+If IsInplaceUpdateRelation() returns true for a table, the table is a system
+catalog that receives heap_inplace_update_scan() calls.  Preparing a
+heap_update() of these tables follows additional locking rules, to ensure we
+don't lose the effects of an inplace update.  In particular, consider a moment
+when a backend has fetched the old tuple to modify, not yet having called
+heap_update().  Another backend's inplace update starting then can't conclude
+until the heap_update() places its new tuple in a buffer.  We enforce that
+using locktags as follows.  While DDL code is the main audience, the executor
+follows these rules to make e.g. "MERGE INTO pg_class" safer.  Locking rules
+are per-catalog:
+
+  pg_class heap_inplace_update_scan() callers: before the call, acquire
+  LOCKTAG_RELATION in mode ShareLock (CREATE INDEX), ShareUpdateExclusiveLock
+  (VACUUM), or a mode with strictly more conflicts.  If the update targets a
+  row of RELKIND_INDEX (but not RELKIND_PARTITIONED_INDEX), that lock must be
+  on the table.  Locking the index rel is optional.  (This allows VACUUM to
+  overwrite per-index pg_class while holding a lock on the table alone.)  We
+  could allow weaker locks, in which case the next paragraph would simply call
+  for stronger locks for its class of commands.  heap_inplace_update_scan()
+  acquires and releases LOCKTAG_TUPLE in InplaceUpdateTupleLock, an alias for
+  ExclusiveLock, on each tuple it overwrites.
+
+  pg_class heap_update() callers: before copying the tuple to modify, take a
+  lock that conflicts with at least one of those from the preceding paragraph.
+  SearchSysCacheLocked1() is one convenient way to acquire LOCKTAG_TUPLE.
+  After heap_update(), release any LOCKTAG_TUPLE.  Most of these callers opt
+  to acquire just the LOCKTAG_RELATION.
+
+  pg_database: before copying the tuple to modify, all updaters of pg_database
+  rows acquire LOCKTAG_TUPLE.  (Few updaters acquire LOCKTAG_OBJECT on the
+  database OID, so it wasn't worth extending that as a second option.)
+
+Ideally, DDL might want to perform permissions checks before LockTuple(), as
+we do with RangeVarGetRelidExtended() callbacks.  We typically don't bother.
+LOCKTAG_TUPLE acquirers release it after each row, so the potential
+inconvenience is lower.
+
+Reading inplace-updated columns
+-------------------------------
+
+Inplace updates create an exception to the rule that tuple data won't change
+under a reader holding a pin.  A reader of a heap_fetch() result tuple may
+witness a torn read.  Current inplace-updated fields are aligned and are no
+wider than four bytes, and current readers don't need consistency across
+fields.  Hence, they get by with just fetching each field once.  XXX such a
+caller may also read a value that has not reached WAL; see
+heap_inplace_update_finish().
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 91b2014..107507e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -76,6 +76,9 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer 
oldbuf,
                                                                  Buffer 
newbuf, HeapTuple oldtup,
                                                                  HeapTuple 
newtup, HeapTuple old_key_tuple,
                                                                  bool 
all_visible_cleared, bool new_all_visible_cleared);
+#ifdef USE_ASSERT_CHECKING
+static void check_inplace_rel_lock(HeapTuple oldtup);
+#endif
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
                                                                                
   Bitmapset *interesting_cols,
                                                                                
   Bitmapset *external_cols,
@@ -97,6 +100,7 @@ static void compute_new_xmax_infomask(TransactionId xmax, 
uint16 old_infomask,
 static TM_Result heap_lock_updated_tuple(Relation rel, HeapTuple tuple,
                                                                                
 ItemPointer ctid, TransactionId xid,
                                                                                
 LockTupleMode mode);
+static bool inplace_xmax_lock(SysScanDesc scan);
 static void GetMultiXactIdHintBits(MultiXactId multi, uint16 *new_infomask,
                                                                   uint16 
*new_infomask2);
 static TransactionId MultiXactIdGetUpdateXid(TransactionId xmax,
@@ -4072,6 +4076,45 @@ l2:
        return TM_Ok;
 }
 
+#ifdef USE_ASSERT_CHECKING
+/*
+ * Confirm adequate relation lock held, per rules from README.tuplock section
+ * "Locking to write inplace-updated tables".
+ */
+static void
+check_inplace_rel_lock(HeapTuple oldtup)
+{
+       Form_pg_class classForm = (Form_pg_class) GETSTRUCT(oldtup);
+       Oid                     relid = classForm->oid;
+       Oid                     dbid;
+       LOCKTAG         tag;
+
+       if (IsSharedRelation(relid))
+               dbid = InvalidOid;
+       else
+               dbid = MyDatabaseId;
+
+       if (classForm->relkind == RELKIND_INDEX)
+       {
+               Relation        irel = index_open(relid, AccessShareLock);
+
+               SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+               index_close(irel, AccessShareLock);
+       }
+       else
+               SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+       if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock, true))
+               elog(WARNING,
+                        "missing lock for relation \"%s\" (OID %u, relkind %c) 
@ TID (%u,%u)",
+                        NameStr(classForm->relname),
+                        relid,
+                        classForm->relkind,
+                        ItemPointerGetBlockNumber(&oldtup->t_self),
+                        ItemPointerGetOffsetNumber(&oldtup->t_self));
+}
+#endif
+
 /*
  * Check if the specified attribute's values are the same.  Subroutine for
  * HeapDetermineColumnsInfo.
@@ -6041,34 +6084,45 @@ heap_abort_speculative(Relation relation, ItemPointer 
tid)
 }
 
 /*
- * heap_inplace_update - update a tuple "in place" (ie, overwrite it)
+ * heap_inplace_update_scan - update a row "in place" (ie, overwrite it)
  *
- * Overwriting violates both MVCC and transactional safety, so the uses
- * of this function in Postgres are extremely limited.  Nonetheless we
- * find some places to use it.
+ * Overwriting violates both MVCC and transactional safety, so the uses of
+ * this function in Postgres are extremely limited.  Nonetheless we find some
+ * places to use it.  See README.tuplock section "Locking to write
+ * inplace-updated tables" and later sections for expectations of readers and
+ * writers of a table that gets inplace updates.  Standard flow:
  *
- * The tuple cannot change size, and therefore it's reasonable to assume
- * that its null bitmap (if any) doesn't change either.  So we just
- * overwrite the data portion of the tuple without touching the null
- * bitmap or any of the header fields.
+ * ... [any slow preparation not requiring oldtup] ...
+ * heap_inplace_update_scan([...], &tup, &inplace_state);
+ * if (!HeapTupleIsValid(tup))
+ *     elog(ERROR, [...]);
+ * ... [buffer is exclusive-locked; mutate "tup"] ...
+ * if (dirty)
+ *     heap_inplace_update_finish(inplace_state, tup);
+ * else
+ *     heap_inplace_update_cancel(inplace_state);
  *
- * tuple is an in-memory tuple structure containing the data to be written
- * over the target tuple.  Also, tuple->t_self identifies the target tuple.
+ * Since this is intended for system catalogs and SERIALIZABLE doesn't cover
+ * DDL, this skips some predicate locks.
  *
- * Note that the tuple updated here had better not come directly from the
- * syscache if the relation has a toast relation as this tuple could
- * include toast values that have been expanded, causing a failure here.
+ * The first several params duplicate the systable_beginscan() param list.
+ * "oldtupcopy" is an output parameter, assigned NULL if the key ceases to
+ * find a live tuple.  (In PROC_IN_VACUUM, that is a low-probability transient
+ * condition.)  If "oldtupcopy" gets non-NULL, you must pass output parameter
+ * "state" to heap_inplace_update_finish() or heap_inplace_update_cancel().
  */
 void
-heap_inplace_update(Relation relation, HeapTuple tuple)
+heap_inplace_update_scan(Relation relation,
+                                                Oid indexId,
+                                                bool indexOK,
+                                                Snapshot snapshot,
+                                                int nkeys, const ScanKeyData 
*key,
+                                                HeapTuple *oldtupcopy, void 
**state)
 {
-       Buffer          buffer;
-       Page            page;
-       OffsetNumber offnum;
-       ItemId          lp = NULL;
-       HeapTupleHeader htup;
-       uint32          oldlen;
-       uint32          newlen;
+       ScanKey         mutable_key = palloc(sizeof(ScanKeyData) * nkeys);
+       int                     retries = 0;
+       SysScanDesc scan;
+       HeapTuple       oldtup;
 
        /*
         * For now, we don't allow parallel updates.  Unlike a regular update,
@@ -6081,21 +6135,70 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
                                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                                 errmsg("cannot update tuples during a parallel 
operation")));
 
-       INJECTION_POINT("inplace-before-pin");
-       buffer = ReadBuffer(relation, 
ItemPointerGetBlockNumber(&(tuple->t_self)));
-       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-       page = (Page) BufferGetPage(buffer);
+       /*
+        * Accept a snapshot argument, for symmetry, but this function advances
+        * its snapshot as needed to reach the tail of the updated tuple chain.
+        */
+       Assert(snapshot == NULL);
 
-       offnum = ItemPointerGetOffsetNumber(&(tuple->t_self));
-       if (PageGetMaxOffsetNumber(page) >= offnum)
-               lp = PageGetItemId(page, offnum);
+       Assert(IsInplaceUpdateRelation(relation) || 
!IsSystemRelation(relation));
 
-       if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-               elog(ERROR, "invalid lp");
+       /* Loop for an exclusive-locked buffer of a non-updated tuple. */
+       do
+       {
+               CHECK_FOR_INTERRUPTS();
 
-       htup = (HeapTupleHeader) PageGetItem(page, lp);
+               /*
+                * Processes issuing heap_update (e.g. GRANT) at maximum speed 
could
+                * drive us to this error.  A hostile table owner has stronger 
ways to
+                * damage their own table, so that's minor.
+                */
+               if (retries++ > 10000)
+                       elog(ERROR, "giving up after too many tries to 
overwrite row");
 
-       oldlen = ItemIdGetLength(lp) - htup->t_hoff;
+               memcpy(mutable_key, key, sizeof(ScanKeyData) * nkeys);
+               INJECTION_POINT("inplace-before-pin");
+               scan = systable_beginscan(relation, indexId, indexOK, snapshot,
+                                                                 nkeys, 
mutable_key);
+               oldtup = systable_getnext(scan);
+               if (!HeapTupleIsValid(oldtup))
+               {
+                       systable_endscan(scan);
+                       *oldtupcopy = NULL;
+                       return;
+               }
+
+#ifdef USE_ASSERT_CHECKING
+               if (RelationGetRelid(relation) == RelationRelationId)
+                       check_inplace_rel_lock(oldtup);
+#endif
+       } while (!inplace_xmax_lock(scan));
+
+       *oldtupcopy = heap_copytuple(oldtup);
+       *state = scan;
+}
+
+/*
+ * heap_inplace_update_finish - second phase of heap_inplace_update_scan()
+ *
+ * The tuple cannot change size, and therefore its header fields and null
+ * bitmap (if any) don't change either.
+ */
+void
+heap_inplace_update_finish(void *state, HeapTuple tuple)
+{
+       SysScanDesc scan = (SysScanDesc) state;
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTuple       oldtup = bslot->base.tuple;
+       HeapTupleHeader htup = oldtup->t_data;
+       Buffer          buffer = bslot->buffer;
+       Relation        relation = scan->heap_rel;
+       uint32          oldlen;
+       uint32          newlen;
+
+       Assert(ItemPointerEquals(&oldtup->t_self, &tuple->t_self));
+       oldlen = oldtup->t_len - htup->t_hoff;
        newlen = tuple->t_len - tuple->t_data->t_hoff;
        if (oldlen != newlen || htup->t_hoff != tuple->t_data->t_hoff)
                elog(ERROR, "wrong tuple length");
@@ -6107,6 +6210,19 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
                   (char *) tuple->t_data + tuple->t_data->t_hoff,
                   newlen);
 
+       /*----------
+        * XXX A crash here can allow datfrozenxid() to get ahead of 
relfrozenxid:
+        *
+        * ["D" is a VACUUM (ONLY_DATABASE_STATS)]
+        * ["R" is a VACUUM tbl]
+        * D: vac_update_datfrozenid() -> systable_beginscan(pg_class)
+        * D: systable_getnext() returns pg_class tuple of tbl
+        * R: memcpy() into pg_class tuple of tbl
+        * D: raise pg_database.datfrozenxid, XLogInsert(), finish
+        * [crash]
+        * [recovery restores datfrozenxid w/o relfrozenxid]
+        */
+
        MarkBufferDirty(buffer);
 
        /* XLOG stuff */
@@ -6127,23 +6243,188 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 
                recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
 
-               PageSetLSN(page, recptr);
+               PageSetLSN(BufferGetPage(buffer), recptr);
        }
 
        END_CRIT_SECTION();
 
-       UnlockReleaseBuffer(buffer);
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       systable_endscan(scan);
 
        /*
         * Send out shared cache inval if necessary.  Note that because we only
         * pass the new version of the tuple, this mustn't be used for any
         * operations that could change catcache lookup keys.  But we aren't
         * bothering with index updates either, so that's true a fortiori.
+        *
+        * XXX ROLLBACK discards the invalidation.  See test inplace-inval.spec.
         */
        if (!IsBootstrapProcessingMode())
                CacheInvalidateHeapTuple(relation, tuple, NULL);
 }
 
+/*
+ * heap_inplace_update_cancel - abandon a heap_inplace_update_scan()
+ *
+ * This is an alternative to making a no-op update.
+ */
+void
+heap_inplace_update_cancel(void *state)
+{
+       SysScanDesc scan = (SysScanDesc) state;
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       Buffer          buffer = bslot->buffer;
+
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       systable_endscan(scan);
+}
+
+/*
+ * inplace_xmax_lock - protect inplace update from concurrent heap_update()
+ *
+ * This operates on the last tuple that systable_getnext() returned.  Evaluate
+ * whether the tuple's state is compatible with a no-key update.  Current
+ * transaction rowmarks are fine, as is KEY SHARE from any transaction.  If
+ * compatible, return true with the buffer exclusive-locked.  Otherwise,
+ * return false after blocking transactions, if any, have ended.
+ *
+ * One could modify this to return true for tuples with delete in progress,
+ * All inplace updaters take lock that conflicts with DROP.  If it does happen
+ * somehow, we'll wait for it like we would an update.
+ *
+ * Readers of inplace-updated fields expect changes to those fields are
+ * durable.  For example, vac_truncate_clog() reads datfrozenxid from
+ * pg_database tuples via catalog snapshots.  A future snapshot must not
+ * return a lower datfrozenxid for the same database OID (lower in the
+ * FullTransactionIdPrecedes() sense).  We achieve that since no update of a
+ * tuple can start while we hold a lock on its buffer.  In cases like
+ * BEGIN;GRANT;CREATE INDEX;COMMIT we're inplace-updating a tuple visible only
+ * to this transaction.  ROLLBACK then is one case where it's okay to lose
+ * inplace updates.  (Restoring relhasindex=false on ROLLBACK is fine, since
+ * any concurrent CREATE INDEX would have blocked, then inplace-updated the
+ * committed tuple.)
+ *
+ * In principle, we could avoid waiting by overwriting every tuple in the
+ * updated tuple chain.  Reader expectations permit updating a tuple only if
+ * it's aborted, is the tail of the chain, or we already updated the tuple
+ * referenced in its t_ctid.  Hence, we would need to overwrite the tuples in
+ * order from tail to head.  That would tolerate either (a) mutating all
+ * tuples in one critical section or (b) accepting a chance of partial
+ * completion.  Partial completion of a relfrozenxid update would have the
+ * weird consequence that the table's next VACUUM could see the table's
+ * relfrozenxid move forward between vacuum_get_cutoffs() and finishing.
+ */
+static bool
+inplace_xmax_lock(SysScanDesc scan)
+{
+       TupleTableSlot *slot = scan->slot;
+       BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTupleData oldtup = *bslot->base.tuple;
+       Buffer          buffer = bslot->buffer;
+       TM_Result       result;
+       bool            ret;
+
+       Assert(TTS_IS_BUFFERTUPLE(slot));
+       Assert(BufferIsValid(buffer));
+
+       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+       /*----------
+        * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
+        *
+        * - wait unconditionally
+        * - no tuple locks
+        * - don't recheck header after wait: simpler to defer to next iteration
+        * - don't try to continue even if the updater aborts: likewise
+        * - no crosscheck
+        */
+       result = HeapTupleSatisfiesUpdate(&oldtup, GetCurrentCommandId(false),
+                                                                         
buffer);
+
+       if (result == TM_Invisible)
+       {
+               /* no known way this can happen */
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg_internal("attempted to overwrite 
invisible tuple")));
+       }
+       else if (result == TM_SelfModified)
+       {
+               /*
+                * CREATE INDEX might reach this if an expression is silly 
enough to
+                * call e.g. SELECT ... FROM pg_class FOR SHARE.
+                */
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("tuple to be updated was already 
modified by an operation triggered by the current command")));
+       }
+       else if (result == TM_BeingModified)
+       {
+               TransactionId xwait;
+               uint16          infomask;
+               Relation        relation;
+
+               xwait = HeapTupleHeaderGetRawXmax(oldtup.t_data);
+               infomask = oldtup.t_data->t_infomask;
+               relation = scan->heap_rel;
+
+               if (infomask & HEAP_XMAX_IS_MULTI)
+               {
+                       LockTupleMode lockmode = LockTupleNoKeyExclusive;
+                       MultiXactStatus mxact_status = 
MultiXactStatusNoKeyUpdate;
+                       int                     remain;
+                       bool            current_is_member;
+
+                       if (DoesMultiXactIdConflict((MultiXactId) xwait, 
infomask,
+                                                                               
lockmode, &current_is_member))
+                       {
+                               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                               systable_endscan(scan);
+                               ret = false;
+                               MultiXactIdWait((MultiXactId) xwait, 
mxact_status, infomask,
+                                                               relation, 
&oldtup.t_self, XLTW_Update,
+                                                               &remain);
+                       }
+                       else
+                               ret = true;
+               }
+               else if (TransactionIdIsCurrentTransactionId(xwait))
+                       ret = true;
+               else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
+                       ret = true;
+               else
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       systable_endscan(scan);
+                       ret = false;
+                       XactLockTableWait(xwait, relation, &oldtup.t_self,
+                                                         XLTW_Update);
+               }
+       }
+       else
+       {
+               ret = (result == TM_Ok);
+               if (!ret)
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       systable_endscan(scan);
+               }
+       }
+
+       /*
+        * GetCatalogSnapshot() relies on invalidation messages to know when to
+        * take a new snapshot.  COMMIT of xwait is responsible for sending the
+        * invalidation.  We're not acquiring heavyweight locks sufficient to
+        * block if not yet sent, so we must take a new snapshot to avoid 
spinning
+        * that ends with a "too many tries" error.  While we don't need this if
+        * xwait aborted, don't bother optimizing that.
+        */
+       if (!ret)
+               InvalidateCatalogSnapshot();
+       return ret;
+}
+
 #define                FRM_NOOP                                0x0001
 #define                FRM_INVALIDATE_XMAX             0x0002
 #define                FRM_RETURN_IS_XID               0x0004
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index a819b41..b4b68b1 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2784,7 +2784,9 @@ index_update_stats(Relation rel,
 {
        Oid                     relid = RelationGetRelid(rel);
        Relation        pg_class;
+       ScanKeyData key[1];
        HeapTuple       tuple;
+       void       *state;
        Form_pg_class rd_rel;
        bool            dirty;
 
@@ -2818,33 +2820,12 @@ index_update_stats(Relation rel,
 
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       /*
-        * Make a copy of the tuple to update.  Normally we use the syscache, 
but
-        * we can't rely on that during bootstrap or while reindexing pg_class
-        * itself.
-        */
-       if (IsBootstrapProcessingMode() ||
-               ReindexIsProcessingHeap(RelationRelationId))
-       {
-               /* don't assume syscache will work */
-               TableScanDesc pg_class_scan;
-               ScanKeyData key[1];
-
-               ScanKeyInit(&key[0],
-                                       Anum_pg_class_oid,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(relid));
-
-               pg_class_scan = table_beginscan_catalog(pg_class, 1, key);
-               tuple = heap_getnext(pg_class_scan, ForwardScanDirection);
-               tuple = heap_copytuple(tuple);
-               table_endscan(pg_class_scan);
-       }
-       else
-       {
-               /* normal case, use syscache */
-               tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
-       }
+       ScanKeyInit(&key[0],
+                               Anum_pg_class_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(relid));
+       heap_inplace_update_scan(pg_class, ClassOidIndexId, true, NULL, 1, key,
+                                                        &tuple, &state);
 
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for relation %u", relid);
@@ -2907,11 +2888,12 @@ index_update_stats(Relation rel,
         */
        if (dirty)
        {
-               heap_inplace_update(pg_class, tuple);
+               heap_inplace_update_finish(state, tuple);
                /* the above sends a cache inval message */
        }
        else
        {
+               heap_inplace_update_cancel(state);
                /* no need to change tuple, but force relcache inval anyway */
                CacheInvalidateRelcacheByTuple(tuple);
        }
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 738bc46..c882f3c 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -29,6 +29,7 @@
 #include "catalog/toasting.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "utils/fmgroids.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
@@ -333,21 +334,36 @@ create_toast_table(Relation rel, Oid toastOid, Oid 
toastIndexOid,
         */
        class_rel = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
-       if (!HeapTupleIsValid(reltup))
-               elog(ERROR, "cache lookup failed for relation %u", relOid);
-
-       ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = toast_relid;
-
        if (!IsBootstrapProcessingMode())
        {
                /* normal case, use a transactional update */
+               reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+               if (!HeapTupleIsValid(reltup))
+                       elog(ERROR, "cache lookup failed for relation %u", 
relOid);
+
+               ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = 
toast_relid;
+
                CatalogTupleUpdate(class_rel, &reltup->t_self, reltup);
        }
        else
        {
                /* While bootstrapping, we cannot UPDATE, so overwrite in-place 
*/
-               heap_inplace_update(class_rel, reltup);
+
+               ScanKeyData key[1];
+               void       *state;
+
+               ScanKeyInit(&key[0],
+                                       Anum_pg_class_oid,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(relOid));
+               heap_inplace_update_scan(class_rel, ClassOidIndexId, true,
+                                                                NULL, 1, key, 
&reltup, &state);
+               if (!HeapTupleIsValid(reltup))
+                       elog(ERROR, "cache lookup failed for relation %u", 
relOid);
+
+               ((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid = 
toast_relid;
+
+               heap_inplace_update_finish(state, reltup);
        }
 
        heap_freetuple(reltup);
diff --git a/src/backend/commands/dbcommands.c 
b/src/backend/commands/dbcommands.c
index be629ea..da4d2b7 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1637,6 +1637,8 @@ dropdb(const char *dbname, bool missing_ok, bool force)
        bool            db_istemplate;
        Relation        pgdbrel;
        HeapTuple       tup;
+       ScanKeyData key[1];
+       void       *inplace_state;
        Form_pg_database datform;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -1774,11 +1776,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         */
        pgstat_drop_database(db_id);
 
-       tup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
-       if (!HeapTupleIsValid(tup))
-               elog(ERROR, "cache lookup failed for database %u", db_id);
-       datform = (Form_pg_database) GETSTRUCT(tup);
-
        /*
         * Except for the deletion of the catalog row, subsequent actions are 
not
         * transactional (consider DropDatabaseBuffers() discarding modified
@@ -1790,8 +1787,17 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         * modification is durable before performing irreversible filesystem
         * operations.
         */
+       ScanKeyInit(&key[0],
+                               Anum_pg_database_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(db_id));
+       heap_inplace_update_scan(pgdbrel, DatabaseOidIndexId, true,
+                                                        NULL, 1, key, &tup, 
&inplace_state);
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "cache lookup failed for database %u", db_id);
+       datform = (Form_pg_database) GETSTRUCT(tup);
        datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
-       heap_inplace_update(pgdbrel, tup);
+       heap_inplace_update_finish(inplace_state, tup);
        XLogFlush(XactLastRecEnd);
 
        /*
@@ -1799,6 +1805,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
         * the row will be gone, but if we fail, dropdb() can be invoked again.
         */
        CatalogTupleDelete(pgdbrel, &tup->t_self);
+       heap_freetuple(tup);
 
        /*
         * Drop db-specific replication slots.
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 7a5ed6b..22d0ce7 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -946,25 +946,18 @@ EventTriggerOnLogin(void)
                {
                        Relation        pg_db = table_open(DatabaseRelationId, 
RowExclusiveLock);
                        HeapTuple       tuple;
+                       void       *state;
                        Form_pg_database db;
                        ScanKeyData key[1];
-                       SysScanDesc scan;
 
-                       /*
-                        * Get the pg_database tuple to scribble on.  Note that 
this does
-                        * not directly rely on the syscache to avoid issues 
with
-                        * flattened toast values for the in-place update.
-                        */
+                       /* Fetch a copy of the tuple to scribble on */
                        ScanKeyInit(&key[0],
                                                Anum_pg_database_oid,
                                                BTEqualStrategyNumber, F_OIDEQ,
                                                ObjectIdGetDatum(MyDatabaseId));
 
-                       scan = systable_beginscan(pg_db, DatabaseOidIndexId, 
true,
-                                                                         NULL, 
1, key);
-                       tuple = systable_getnext(scan);
-                       tuple = heap_copytuple(tuple);
-                       systable_endscan(scan);
+                       heap_inplace_update_scan(pg_db, DatabaseOidIndexId, 
true,
+                                                                        NULL, 
1, key, &tuple, &state);
 
                        if (!HeapTupleIsValid(tuple))
                                elog(ERROR, "could not find tuple for database 
%u", MyDatabaseId);
@@ -980,13 +973,15 @@ EventTriggerOnLogin(void)
                                 * that avoids possible waiting on the 
row-level lock. Second,
                                 * that avoids dealing with TOAST.
                                 *
-                                * It's known that changes made by 
heap_inplace_update() may
-                                * be lost due to concurrent normal updates.  
However, we are
-                                * OK with that.  The subsequent connections 
will still have a
-                                * chance to set "dathasloginevt" to false.
+                                * Changes made by inplace update may be lost 
due to
+                                * concurrent normal updates; see 
inplace-inval.spec. However,
+                                * we are OK with that.  The subsequent 
connections will still
+                                * have a chance to set "dathasloginevt" to 
false.
                                 */
-                               heap_inplace_update(pg_db, tuple);
+                               heap_inplace_update_finish(state, tuple);
                        }
+                       else
+                               heap_inplace_update_cancel(state);
                        table_close(pg_db, RowExclusiveLock);
                        heap_freetuple(tuple);
                }
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 48f8eab..d299a25 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1405,7 +1405,9 @@ vac_update_relstats(Relation relation,
 {
        Oid                     relid = RelationGetRelid(relation);
        Relation        rd;
+       ScanKeyData key[1];
        HeapTuple       ctup;
+       void       *inplace_state;
        Form_pg_class pgcform;
        bool            dirty,
                                futurexid,
@@ -1416,7 +1418,12 @@ vac_update_relstats(Relation relation,
        rd = table_open(RelationRelationId, RowExclusiveLock);
 
        /* Fetch a copy of the tuple to scribble on */
-       ctup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
+       ScanKeyInit(&key[0],
+                               Anum_pg_class_oid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(relid));
+       heap_inplace_update_scan(rd, ClassOidIndexId, true,
+                                                        NULL, 1, key, &ctup, 
&inplace_state);
        if (!HeapTupleIsValid(ctup))
                elog(ERROR, "pg_class entry for relid %u vanished during 
vacuuming",
                         relid);
@@ -1524,7 +1531,9 @@ vac_update_relstats(Relation relation,
 
        /* If anything changed, write out the tuple. */
        if (dirty)
-               heap_inplace_update(rd, ctup);
+               heap_inplace_update_finish(inplace_state, ctup);
+       else
+               heap_inplace_update_cancel(inplace_state);
 
        table_close(rd, RowExclusiveLock);
 
@@ -1576,6 +1585,7 @@ vac_update_datfrozenxid(void)
        bool            bogus = false;
        bool            dirty = false;
        ScanKeyData key[1];
+       void       *inplace_state;
 
        /*
         * Restrict this task to one backend per database.  This avoids race
@@ -1699,20 +1709,18 @@ vac_update_datfrozenxid(void)
        relation = table_open(DatabaseRelationId, RowExclusiveLock);
 
        /*
-        * Get the pg_database tuple to scribble on.  Note that this does not
-        * directly rely on the syscache to avoid issues with flattened toast
-        * values for the in-place update.
+        * Fetch a copy of the tuple to scribble on.  We could check the 
syscache
+        * tuple first.  If that concluded !dirty, we'd avoid waiting on
+        * concurrent heap_update() and would avoid exclusive-locking the 
buffer.
+        * For now, don't optimize that.
         */
        ScanKeyInit(&key[0],
                                Anum_pg_database_oid,
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(MyDatabaseId));
 
-       scan = systable_beginscan(relation, DatabaseOidIndexId, true,
-                                                         NULL, 1, key);
-       tuple = systable_getnext(scan);
-       tuple = heap_copytuple(tuple);
-       systable_endscan(scan);
+       heap_inplace_update_scan(relation, DatabaseOidIndexId, true,
+                                                        NULL, 1, key, &tuple, 
&inplace_state);
 
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for database %u", 
MyDatabaseId);
@@ -1746,7 +1754,9 @@ vac_update_datfrozenxid(void)
                newMinMulti = dbform->datminmxid;
 
        if (dirty)
-               heap_inplace_update(relation, tuple);
+               heap_inplace_update_finish(inplace_state, tuple);
+       else
+               heap_inplace_update_cancel(inplace_state);
 
        heap_freetuple(tuple);
        table_close(relation, RowExclusiveLock);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec8..2e13fb9 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -336,7 +336,14 @@ extern TM_Result heap_lock_tuple(Relation relation, 
HeapTuple tuple,
                                                                 bool 
follow_updates,
                                                                 Buffer 
*buffer, struct TM_FailureData *tmfd);
 
-extern void heap_inplace_update(Relation relation, HeapTuple tuple);
+extern void heap_inplace_update_scan(Relation relation,
+                                                                        Oid 
indexId,
+                                                                        bool 
indexOK,
+                                                                        
Snapshot snapshot,
+                                                                        int 
nkeys, const ScanKeyData *key,
+                                                                        
HeapTuple *oldtupcopy, void **state);
+extern void heap_inplace_update_finish(void *state, HeapTuple tuple);
+extern void heap_inplace_update_cancel(void *state);
 extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
                                                                          const 
struct VacuumCutoffs *cutoffs,
                                                                          
HeapPageFreeze *pagefrz,
diff --git a/src/test/isolation/expected/intra-grant-inplace-db.out 
b/src/test/isolation/expected/intra-grant-inplace-db.out
index 432ece5..a91402c 100644
--- a/src/test/isolation/expected/intra-grant-inplace-db.out
+++ b/src/test/isolation/expected/intra-grant-inplace-db.out
@@ -9,20 +9,20 @@ step b1: BEGIN;
 step grant1: 
        GRANT TEMP ON DATABASE isolation_regression TO regress_temp_grantee;
 
-step vac2: VACUUM (FREEZE);
+step vac2: VACUUM (FREEZE); <waiting ...>
 step snap3: 
        INSERT INTO frozen_witness
        SELECT datfrozenxid FROM pg_database WHERE datname = current_catalog;
 
 step c1: COMMIT;
+step vac2: <... completed>
 step cmp3: 
        SELECT 'datfrozenxid retreated'
        FROM pg_database
        WHERE datname = current_catalog
                AND age(datfrozenxid) > (SELECT min(age(x)) FROM 
frozen_witness);
 
-?column?              
-----------------------
-datfrozenxid retreated
-(1 row)
+?column?
+--------
+(0 rows)
 
diff --git a/src/test/isolation/expected/intra-grant-inplace.out 
b/src/test/isolation/expected/intra-grant-inplace.out
index cc1e47a..c2a9841 100644
--- a/src/test/isolation/expected/intra-grant-inplace.out
+++ b/src/test/isolation/expected/intra-grant-inplace.out
@@ -14,15 +14,16 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step c1: COMMIT;
+step addk2: <... completed>
 step read2: 
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
 
 relhasindex
 -----------
-f          
+t          
 (1 row)
 
 
@@ -58,8 +59,9 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
+step addk2: <... completed>
 
 starting permutation: b2 sfnku2 addk2 c2
 step b2: BEGIN;
@@ -122,17 +124,18 @@ relhasindex
 f          
 (1 row)
 
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
 step grant1: <... completed>
 step c1: COMMIT;
+step addk2: <... completed>
 step read2: 
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
 
 relhasindex
 -----------
-f          
+t          
 (1 row)
 
 
diff --git a/src/test/isolation/specs/intra-grant-inplace-db.spec 
b/src/test/isolation/specs/intra-grant-inplace-db.spec
index bbecd5d..9de40ec 100644
--- a/src/test/isolation/specs/intra-grant-inplace-db.spec
+++ b/src/test/isolation/specs/intra-grant-inplace-db.spec
@@ -42,5 +42,4 @@ step cmp3     {
 }
 
 
-# XXX extant bug
 permutation snap3 b1 grant1 vac2(c1) snap3 c1 cmp3
diff --git a/src/test/isolation/specs/intra-grant-inplace.spec 
b/src/test/isolation/specs/intra-grant-inplace.spec
index 3cd696b..eed0b52 100644
--- a/src/test/isolation/specs/intra-grant-inplace.spec
+++ b/src/test/isolation/specs/intra-grant-inplace.spec
@@ -73,7 +73,7 @@ step keyshr5  {
 teardown       { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned post-bugfix behavior
+# XXX extant bugs: permutation comments refer to planned future LockTuple()
 
 permutation
        b1
diff --git a/src/test/modules/injection_points/expected/inplace.out 
b/src/test/modules/injection_points/expected/inplace.out
index 123f45a..db7dab6 100644
--- a/src/test/modules/injection_points/expected/inplace.out
+++ b/src/test/modules/injection_points/expected/inplace.out
@@ -40,4 +40,301 @@ step read1:
        SELECT reltuples = -1 AS reltuples_unknown
        FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
 
-ERROR:  could not create unique index "pg_class_oid_index"
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 r2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step r2: ROLLBACK;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 revoke2 grant2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: vac1 begin2 grant2 revoke2 mkrels3 c2 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step c2: COMMIT;
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 r2 grant2 revoke2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step r2: ROLLBACK;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
+
+starting permutation: begin2 grant2 vac1 c2 revoke2 vac3 mkrels3 read1
+mkrels
+------
+      
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step begin2: BEGIN;
+step grant2: GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC;
+step vac1: VACUUM vactest.orig50;  -- wait during inplace update <waiting ...>
+step c2: COMMIT;
+step revoke2: REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC;
+step vac3: VACUUM pg_class;
+step mkrels3: 
+       SELECT vactest.mkrels('intruder', 1, 100);  -- repopulate LP_UNUSED
+       SELECT injection_points_detach('inplace-before-pin');
+       SELECT injection_points_wakeup('inplace-before-pin');
+
+mkrels
+------
+      
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step vac1: <... completed>
+step read1: 
+       REINDEX TABLE pg_class;  -- look for duplicates
+       SELECT reltuples = -1 AS reltuples_unknown
+       FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
+
+reltuples_unknown
+-----------------
+f                
+(1 row)
+
diff --git a/src/test/modules/injection_points/specs/inplace.spec 
b/src/test/modules/injection_points/specs/inplace.spec
index e957713..86539a5 100644
--- a/src/test/modules/injection_points/specs/inplace.spec
+++ b/src/test/modules/injection_points/specs/inplace.spec
@@ -32,12 +32,9 @@ setup
        CREATE TABLE vactest.orig50 ();
        SELECT vactest.mkrels('orig', 51, 100);
 }
-
-# XXX DROP causes an assertion failure; adopt DROP once fixed
 teardown
 {
-       --DROP SCHEMA vactest CASCADE;
-       DO $$BEGIN EXECUTE 'ALTER SCHEMA vactest RENAME TO schema' || oid FROM 
pg_namespace where nspname = 'vactest'; END$$;
+       DROP SCHEMA vactest CASCADE;
        DROP EXTENSION injection_points;
 }
 
@@ -56,11 +53,13 @@ step read1  {
        FROM pg_class WHERE oid = 'vactest.orig50'::regclass;
 }
 
-
 # Transactional updates of the tuple vac1 is waiting to inplace-update.
 session s2
 step grant2            { GRANT SELECT ON TABLE vactest.orig50 TO PUBLIC; }
-
+step revoke2   { REVOKE SELECT ON TABLE vactest.orig50 FROM PUBLIC; }
+step begin2            { BEGIN; }
+step c2                        { COMMIT; }
+step r2                        { ROLLBACK; }
 
 # Non-blocking actions.
 session s3
@@ -74,10 +73,69 @@ step mkrels3        {
 }
 
 
-# XXX extant bug
+# target gains a successor at the last moment
 permutation
        vac1(mkrels3)   # reads pg_class tuple T0 for vactest.orig50, xmax 
invalid
        grant2                  # T0 becomes eligible for pruning, T1 is 
successor
        vac3                    # T0 becomes LP_UNUSED
-       mkrels3                 # T0 reused; vac1 wakes and overwrites the 
reused T0
+       mkrels3                 # vac1 wakes, scans to T1
        read1
+
+# target already has a successor, which commits
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1
+       vac1(mkrels3)   # reads T0 for vactest.orig50
+       c2                              # T0 becomes eligible for pruning
+       vac3                    # T0 becomes LP_UNUSED
+       mkrels3                 # vac1 wakes, scans to T1
+       read1
+
+# target already has a successor, which becomes LP_UNUSED at the last moment
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1
+       vac1(mkrels3)   # reads T0 for vactest.orig50
+       r2                              # T1 becomes eligible for pruning
+       vac3                    # T1 becomes LP_UNUSED
+       mkrels3                 # reuse T1; vac1 scans to T0
+       read1
+
+# target already has a successor, which becomes LP_REDIRECT at the last moment
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       vac1(mkrels3)   # reads T0
+       c2
+       revoke2                 # HOT update to T2
+       grant2                  # HOT update to T3
+       vac3                    # T1 becomes LP_REDIRECT
+       mkrels3                 # reuse T2; vac1 scans to T3
+       read1
+
+# waiting for updater to end
+permutation
+       vac1(c2)                # reads pg_class tuple T0 for vactest.orig50, 
xmax invalid
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       revoke2                 # HOT update to T2
+       mkrels3                 # vac1 awakes briefly, then waits for s2
+       c2
+       read1
+
+# Another LP_UNUSED.  This time, do change the live tuple.  Final live tuple
+# body is identical to original, at a different TID.
+permutation
+       begin2
+       grant2                  # T0.t_ctid = T1, non-HOT due to filled page
+       vac1(mkrels3)   # reads T0
+       r2                              # T1 becomes eligible for pruning
+       grant2                  # T0.t_ctid = T2; T0 becomes eligible for 
pruning
+       revoke2                 # T2.t_ctid = T3; T2 becomes eligible for 
pruning
+       vac3                    # T0, T1 & T2 become LP_UNUSED
+       mkrels3                 # reuse T0, T1 & T2; vac1 scans to T3
+       read1
+
+# Another LP_REDIRECT.  Compared to the earlier test, omit the last grant2.
+# Hence, final live tuple body is identical to original, at a different TID.
+permutation begin2 grant2 vac1(mkrels3) c2 revoke2 vac3 mkrels3 read1
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Make heap_update() callers wait for inplace update.
    
    The previous commit fixed some ways of losing an inplace update.  It
    remained possible to lose one when a backend working toward a
    heap_update() copied a tuple into memory just before inplace update of
    that tuple.  In catalogs eligible for inplace update, use LOCKTAG_TUPLE
    to govern admission to the steps of copying an old tuple, modifying it,
    and issuing heap_update().  This includes UPDATE and MERGE commands.  To
    avoid changing most of the pg_class DDL, don't require LOCKTAG_TUPLE
    when holding a relation lock sufficient to exclude inplace updaters.
    Back-patch to v12 (all supported versions).
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/20231027214946.79.nmi...@google.com

diff --git a/src/backend/access/heap/README.tuplock 
b/src/backend/access/heap/README.tuplock
index dbfa2b7..fb06ff2 100644
--- a/src/backend/access/heap/README.tuplock
+++ b/src/backend/access/heap/README.tuplock
@@ -157,8 +157,6 @@ is set.
 Locking to write inplace-updated tables
 ---------------------------------------
 
-[This is the plan, but LOCKTAG_TUPLE acquisition is not yet here.]
-
 If IsInplaceUpdateRelation() returns true for a table, the table is a system
 catalog that receives heap_inplace_update_scan() calls.  Preparing a
 heap_update() of these tables follows additional locking rules, to ensure we
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 107507e..797bddf 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -51,6 +51,8 @@
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_database_d.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -77,6 +79,9 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer 
oldbuf,
                                                                  HeapTuple 
newtup, HeapTuple old_key_tuple,
                                                                  bool 
all_visible_cleared, bool new_all_visible_cleared);
 #ifdef USE_ASSERT_CHECKING
+static void check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                               
                 ItemPointer otid,
+                                                                               
                 HeapTuple newtup);
 static void check_inplace_rel_lock(HeapTuple oldtup);
 #endif
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
@@ -126,6 +131,8 @@ static HeapTuple ExtractReplicaIdentity(Relation relation, 
HeapTuple tp, bool ke
  * heavyweight lock mode and MultiXactStatus values to use for any particular
  * tuple lock strength.
  *
+ * These interact with InplaceUpdateTupleLock, an alias for ExclusiveLock.
+ *
  * Don't look at lockstatus/updstatus directly!  Use get_mxact_status_for_lock
  * instead.
  */
@@ -3212,6 +3219,10 @@ heap_update(Relation relation, ItemPointer otid, 
HeapTuple newtup,
                                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                                 errmsg("cannot update tuples during a parallel 
operation")));
 
+#ifdef USE_ASSERT_CHECKING
+       check_lock_if_inplace_updateable_rel(relation, otid, newtup);
+#endif
+
        /*
         * Fetch the list of attributes to be checked for various operations.
         *
@@ -4078,6 +4089,89 @@ l2:
 
 #ifdef USE_ASSERT_CHECKING
 /*
+ * Confirm adequate lock held during heap_update(), per rules from
+ * README.tuplock section "Locking to write inplace-updated tables".
+ */
+static void
+check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                        
ItemPointer otid,
+                                                                        
HeapTuple newtup)
+{
+       /* LOCKTAG_TUPLE acceptable for any catalog */
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+               case DatabaseRelationId:
+                       {
+                               LOCKTAG         tuptag;
+
+                               SET_LOCKTAG_TUPLE(tuptag,
+                                                                 
relation->rd_lockInfo.lockRelId.dbId,
+                                                                 
relation->rd_lockInfo.lockRelId.relId,
+                                                                 
ItemPointerGetBlockNumber(otid),
+                                                                 
ItemPointerGetOffsetNumber(otid));
+                               if (LockHeldByMe(&tuptag, 
InplaceUpdateTupleLock, false))
+                                       return;
+                       }
+                       break;
+               default:
+                       Assert(!IsInplaceUpdateRelation(relation));
+                       return;
+       }
+
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+                       {
+                               /* LOCKTAG_TUPLE or LOCKTAG_RELATION ok */
+                               Form_pg_class classForm = (Form_pg_class) 
GETSTRUCT(newtup);
+                               Oid                     relid = classForm->oid;
+                               Oid                     dbid;
+                               LOCKTAG         tag;
+
+                               if (IsSharedRelation(relid))
+                                       dbid = InvalidOid;
+                               else
+                                       dbid = MyDatabaseId;
+
+                               if (classForm->relkind == RELKIND_INDEX)
+                               {
+                                       Relation        irel = 
index_open(relid, AccessShareLock);
+
+                                       SET_LOCKTAG_RELATION(tag, dbid, 
irel->rd_index->indrelid);
+                                       index_close(irel, AccessShareLock);
+                               }
+                               else
+                                       SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+                               if (!LockHeldByMe(&tag, 
ShareUpdateExclusiveLock, false) &&
+                                       !LockHeldByMe(&tag, 
ShareRowExclusiveLock, true))
+                                       elog(WARNING,
+                                                "missing lock for relation 
\"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+                                                NameStr(classForm->relname),
+                                                relid,
+                                                classForm->relkind,
+                                                
ItemPointerGetBlockNumber(otid),
+                                                
ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+               case DatabaseRelationId:
+                       {
+                               /* LOCKTAG_TUPLE required */
+                               Form_pg_database dbForm = (Form_pg_database) 
GETSTRUCT(newtup);
+
+                               elog(WARNING,
+                                        "missing lock on database \"%s\" (OID 
%u) @ TID (%u,%u)",
+                                        NameStr(dbForm->datname),
+                                        dbForm->oid,
+                                        ItemPointerGetBlockNumber(otid),
+                                        ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+       }
+}
+
+/*
  * Confirm adequate relation lock held, per rules from README.tuplock section
  * "Locking to write inplace-updated tables".
  */
@@ -6123,6 +6217,7 @@ heap_inplace_update_scan(Relation relation,
        int                     retries = 0;
        SysScanDesc scan;
        HeapTuple       oldtup;
+       ItemPointerData locked;
 
        /*
         * For now, we don't allow parallel updates.  Unlike a regular update,
@@ -6144,6 +6239,7 @@ heap_inplace_update_scan(Relation relation,
        Assert(IsInplaceUpdateRelation(relation) || 
!IsSystemRelation(relation));
 
        /* Loop for an exclusive-locked buffer of a non-updated tuple. */
+       ItemPointerSetInvalid(&locked);
        do
        {
                CHECK_FOR_INTERRUPTS();
@@ -6163,6 +6259,8 @@ heap_inplace_update_scan(Relation relation,
                oldtup = systable_getnext(scan);
                if (!HeapTupleIsValid(oldtup))
                {
+                       if (ItemPointerIsValid(&locked))
+                               UnlockTuple(relation, &locked, 
InplaceUpdateTupleLock);
                        systable_endscan(scan);
                        *oldtupcopy = NULL;
                        return;
@@ -6172,6 +6270,15 @@ heap_inplace_update_scan(Relation relation,
                if (RelationGetRelid(relation) == RelationRelationId)
                        check_inplace_rel_lock(oldtup);
 #endif
+
+               if (!(ItemPointerIsValid(&locked) &&
+                         ItemPointerEquals(&locked, &oldtup->t_self)))
+               {
+                       if (ItemPointerIsValid(&locked))
+                               UnlockTuple(relation, &locked, 
InplaceUpdateTupleLock);
+                       LockTuple(relation, &oldtup->t_self, 
InplaceUpdateTupleLock);
+               }
+               locked = oldtup->t_self;
        } while (!inplace_xmax_lock(scan));
 
        *oldtupcopy = heap_copytuple(oldtup);
@@ -6183,6 +6290,8 @@ heap_inplace_update_scan(Relation relation,
  *
  * The tuple cannot change size, and therefore its header fields and null
  * bitmap (if any) don't change either.
+ *
+ * Since we hold LOCKTAG_TUPLE, no updater has a local copy of this tuple.
  */
 void
 heap_inplace_update_finish(void *state, HeapTuple tuple)
@@ -6249,6 +6358,7 @@ heap_inplace_update_finish(void *state, HeapTuple tuple)
        END_CRIT_SECTION();
 
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
        systable_endscan(scan);
 
        /*
@@ -6274,9 +6384,12 @@ heap_inplace_update_cancel(void *state)
        SysScanDesc scan = (SysScanDesc) state;
        TupleTableSlot *slot = scan->slot;
        BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+       HeapTuple       oldtup = bslot->base.tuple;
        Buffer          buffer = bslot->buffer;
+       Relation        relation = scan->heap_rel;
 
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       UnlockTuple(relation, &oldtup->t_self, InplaceUpdateTupleLock);
        systable_endscan(scan);
 }
 
@@ -6334,7 +6447,7 @@ inplace_xmax_lock(SysScanDesc scan)
         * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
         *
         * - wait unconditionally
-        * - no tuple locks
+        * - caller handles tuple lock, since inplace needs it unconditionally
         * - don't recheck header after wait: simpler to defer to next iteration
         * - don't try to continue even if the updater aborts: likewise
         * - no crosscheck
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index a44ccee..bc0e259 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -75,6 +75,7 @@
 #include "nodes/makefuncs.h"
 #include "parser/parse_func.h"
 #include "parser/parse_type.h"
+#include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/aclchk_internal.h"
 #include "utils/builtins.h"
@@ -1848,7 +1849,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                HeapTuple       tuple;
                ListCell   *cell_colprivs;
 
-               tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+               tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relOid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for relation %u", 
relOid);
                pg_class_tuple = (Form_pg_class) GETSTRUCT(tuple);
@@ -2060,6 +2061,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                                                                                
 values, nulls, replaces);
 
                        CatalogTupleUpdate(relation, &newtuple->t_self, 
newtuple);
+                       UnlockTuple(relation, &tuple->t_self, 
InplaceUpdateTupleLock);
 
                        /* Update initial privileges for extensions */
                        recordExtensionInitPriv(relOid, RelationRelationId, 0, 
new_acl);
@@ -2072,6 +2074,8 @@ ExecGrant_Relation(InternalGrant *istmt)
 
                        pfree(new_acl);
                }
+               else
+                       UnlockTuple(relation, &tuple->t_self, 
InplaceUpdateTupleLock);
 
                /*
                 * Handle column-level privileges, if any were specified or 
implied.
@@ -2185,7 +2189,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, 
AclMode default_privs,
                Oid                *oldmembers;
                Oid                *newmembers;
 
-               tuple = SearchSysCache1(cacheid, ObjectIdGetDatum(objectid));
+               tuple = SearchSysCacheLocked1(cacheid, 
ObjectIdGetDatum(objectid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for %s %u", 
get_object_class_descr(classid), objectid);
 
@@ -2261,6 +2265,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, 
AclMode default_privs,
                                                                         nulls, 
replaces);
 
                CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+               UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
                /* Update initial privileges for extensions */
                recordExtensionInitPriv(objectid, classid, 0, new_acl);
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 6c39434..8aefbcd 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -138,6 +138,15 @@ IsCatalogRelationOid(Oid relid)
 /*
  * IsInplaceUpdateRelation
  *             True iff core code performs inplace updates on the relation.
+ *
+ *             This is used for assertions and for making the executor follow 
the
+ *             locking protocol described at README.tuplock section "Locking 
to write
+ *             inplace-updated tables".  Extensions may inplace-update other 
heap
+ *             tables, but concurrent SQL UPDATE on the same table may 
overwrite
+ *             those modifications.
+ *
+ *             The executor can assume these are not partitions or partitioned 
and
+ *             have no triggers.
  */
 bool
 IsInplaceUpdateRelation(Relation relation)
diff --git a/src/backend/commands/dbcommands.c 
b/src/backend/commands/dbcommands.c
index da4d2b7..fd48022 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1864,6 +1864,7 @@ RenameDatabase(const char *oldname, const char *newname)
 {
        Oid                     db_id;
        HeapTuple       newtup;
+       ItemPointerData otid;
        Relation        rel;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -1935,11 +1936,13 @@ RenameDatabase(const char *oldname, const char *newname)
                                 errdetail_busy_db(notherbackends, 
npreparedxacts)));
 
        /* rename */
-       newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
+       newtup = SearchSysCacheLockedCopy1(DATABASEOID, 
ObjectIdGetDatum(db_id));
        if (!HeapTupleIsValid(newtup))
                elog(ERROR, "cache lookup failed for database %u", db_id);
+       otid = newtup->t_self;
        namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
-       CatalogTupleUpdate(rel, &newtup->t_self, newtup);
+       CatalogTupleUpdate(rel, &otid, newtup);
+       UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2188,6 +2191,7 @@ movedb(const char *dbname, const char *tblspcname)
                        ereport(ERROR,
                                        (errcode(ERRCODE_UNDEFINED_DATABASE),
                                         errmsg("database \"%s\" does not 
exist", dbname)));
+               LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                new_record[Anum_pg_database_dattablespace - 1] = 
ObjectIdGetDatum(dst_tblspcoid);
                new_record_repl[Anum_pg_database_dattablespace - 1] = true;
@@ -2196,6 +2200,7 @@ movedb(const char *dbname, const char *tblspcname)
                                                                         
new_record,
                                                                         
new_record_nulls, new_record_repl);
                CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
+               UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2426,6 +2431,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt 
*stmt, bool isTopLevel)
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", 
stmt->dbname)));
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datform = (Form_pg_database) GETSTRUCT(tuple);
        dboid = datform->oid;
@@ -2475,6 +2481,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt 
*stmt, bool isTopLevel)
        newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
                                                                 
new_record_nulls, new_record_repl);
        CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
 
@@ -2524,6 +2531,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
        if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
                                           stmt->dbname);
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datum = heap_getattr(tuple, Anum_pg_database_datcollversion, 
RelationGetDescr(rel), &isnull);
        oldversion = isnull ? NULL : TextDatumGetCString(datum);
@@ -2552,6 +2560,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
                bool            nulls[Natts_pg_database] = {0};
                bool            replaces[Natts_pg_database] = {0};
                Datum           values[Natts_pg_database] = {0};
+               HeapTuple       newtuple;
 
                ereport(NOTICE,
                                (errmsg("changing version from %s to %s",
@@ -2560,14 +2569,15 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt 
*stmt)
                values[Anum_pg_database_datcollversion - 1] = 
CStringGetTextDatum(newversion);
                replaces[Anum_pg_database_datcollversion - 1] = true;
 
-               tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
-                                                                 values, 
nulls, replaces);
-               CatalogTupleUpdate(rel, &tuple->t_self, tuple);
-               heap_freetuple(tuple);
+               newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
+                                                                        
values, nulls, replaces);
+               CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+               heap_freetuple(newtuple);
        }
        else
                ereport(NOTICE,
                                (errmsg("version has not changed")));
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2679,6 +2689,8 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
                                        
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                         errmsg("permission denied to change 
owner of database")));
 
+               LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
+
                repl_repl[Anum_pg_database_datdba - 1] = true;
                repl_val[Anum_pg_database_datdba - 1] = 
ObjectIdGetDatum(newOwnerId);
 
@@ -2700,6 +2712,7 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 
                newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), 
repl_val, repl_null, repl_repl);
                CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
+               UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
                heap_freetuple(newtuple);
 
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 22d0ce7..36d82bd 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -388,6 +388,7 @@ SetDatabaseHasLoginEventTriggers(void)
        /* Set dathasloginevt flag in pg_database */
        Form_pg_database db;
        Relation        pg_db = table_open(DatabaseRelationId, 
RowExclusiveLock);
+       ItemPointerData otid;
        HeapTuple       tuple;
 
        /*
@@ -399,16 +400,18 @@ SetDatabaseHasLoginEventTriggers(void)
         */
        LockSharedObject(DatabaseRelationId, MyDatabaseId, 0, 
AccessExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(DATABASEOID, 
ObjectIdGetDatum(MyDatabaseId));
+       tuple = SearchSysCacheLockedCopy1(DATABASEOID, 
ObjectIdGetDatum(MyDatabaseId));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for database %u", 
MyDatabaseId);
+       otid = tuple->t_self;
        db = (Form_pg_database) GETSTRUCT(tuple);
        if (!db->dathasloginevt)
        {
                db->dathasloginevt = true;
-               CatalogTupleUpdate(pg_db, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_db, &otid, tuple);
                CommandCounterIncrement();
        }
+       UnlockTuple(pg_db, &otid, InplaceUpdateTupleLock);
        table_close(pg_db, RowExclusiveLock);
        heap_freetuple(tuple);
 }
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 2caab88..8d04ca0 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -4409,14 +4409,17 @@ update_relispartition(Oid relationId, bool newval)
 {
        HeapTuple       tup;
        Relation        classRel;
+       ItemPointerData otid;
 
        classRel = table_open(RelationRelationId, RowExclusiveLock);
-       tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
+       tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
        if (!HeapTupleIsValid(tup))
                elog(ERROR, "cache lookup failed for relation %u", relationId);
+       otid = tup->t_self;
        Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
        ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
-       CatalogTupleUpdate(classRel, &tup->t_self, tup);
+       CatalogTupleUpdate(classRel, &otid, tup);
+       UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tup);
        table_close(classRel, RowExclusiveLock);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 8fcb188..7fa80a5 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -3609,6 +3609,7 @@ SetRelationTableSpace(Relation rel,
 {
        Relation        pg_class;
        HeapTuple       tuple;
+       ItemPointerData otid;
        Form_pg_class rd_rel;
        Oid                     reloid = RelationGetRelid(rel);
 
@@ -3617,9 +3618,10 @@ SetRelationTableSpace(Relation rel,
        /* Get a modifiable copy of the relation's pg_class row. */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+       tuple = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(reloid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", reloid);
+       otid = tuple->t_self;
        rd_rel = (Form_pg_class) GETSTRUCT(tuple);
 
        /* Update the pg_class row. */
@@ -3627,7 +3629,8 @@ SetRelationTableSpace(Relation rel,
                InvalidOid : newTableSpaceId;
        if (RelFileNumberIsValid(newRelFilenumber))
                rd_rel->relfilenode = newRelFilenumber;
-       CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+       CatalogTupleUpdate(pg_class, &otid, tuple);
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
 
        /*
         * Record dependency on tablespace.  This is only required for relations
@@ -4121,6 +4124,7 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
 {
        Relation        targetrelation;
        Relation        relrelation;    /* for RELATION relation */
+       ItemPointerData otid;
        HeapTuple       reltup;
        Form_pg_class relform;
        Oid                     namespaceId;
@@ -4143,7 +4147,8 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
         */
        relrelation = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       reltup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       otid = reltup->t_self;
        if (!HeapTupleIsValid(reltup))  /* shouldn't happen */
                elog(ERROR, "cache lookup failed for relation %u", myrelid);
        relform = (Form_pg_class) GETSTRUCT(reltup);
@@ -4170,7 +4175,8 @@ RenameRelationInternal(Oid myrelid, const char 
*newrelname, bool is_internal, bo
         */
        namestrcpy(&(relform->relname), newrelname);
 
-       CatalogTupleUpdate(relrelation, &reltup->t_self, reltup);
+       CatalogTupleUpdate(relrelation, &otid, reltup);
+       UnlockTuple(relrelation, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHookArg(RelationRelationId, myrelid, 0,
                                                                 InvalidOid, 
is_internal);
@@ -14917,7 +14923,7 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
 
        /* Fetch heap tuple */
        relid = RelationGetRelid(rel);
-       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+       tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", relid);
 
@@ -15021,6 +15027,7 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
                                                                 repl_val, 
repl_null, repl_repl);
 
        CatalogTupleUpdate(pgclass, &newtuple->t_self, newtuple);
+       UnlockTuple(pgclass, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), 0);
 
@@ -17170,7 +17177,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
        ObjectAddress thisobj;
        bool            already_done = false;
 
-       classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+       /* no rel lock for relkind=c so use LOCKTAG_TUPLE */
+       classTup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relOid));
        if (!HeapTupleIsValid(classTup))
                elog(ERROR, "cache lookup failed for relation %u", relOid);
        classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -17189,6 +17197,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
        already_done = object_address_present(&thisobj, objsMoved);
        if (!already_done && oldNspOid != newNspOid)
        {
+               ItemPointerData otid = classTup->t_self;
+
                /* check for duplicate name (more friendly than unique-index 
failure) */
                if (get_relname_relid(NameStr(classForm->relname),
                                                          newNspOid) != 
InvalidOid)
@@ -17201,7 +17211,9 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
                /* classTup is a copy, so OK to scribble on */
                classForm->relnamespace = newNspOid;
 
-               CatalogTupleUpdate(classRel, &classTup->t_self, classTup);
+               CatalogTupleUpdate(classRel, &otid, classTup);
+               UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
+
 
                /* Update dependency on schema if caller said so */
                if (hasDependEntry &&
@@ -17213,6 +17225,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid 
relOid,
                        elog(ERROR, "could not change schema dependency for 
relation \"%s\"",
                                 NameStr(classForm->relname));
        }
+       else
+               UnlockTuple(classRel, &classTup->t_self, 
InplaceUpdateTupleLock);
        if (!already_done)
        {
                add_exact_object_address(&thisobj, objsMoved);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4d7c92d..321ad47 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1209,6 +1209,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
        resultRelInfo->ri_NumIndices = 0;
        resultRelInfo->ri_IndexRelationDescs = NULL;
        resultRelInfo->ri_IndexRelationInfo = NULL;
+       resultRelInfo->ri_needLockTagTuple =
+               IsInplaceUpdateRelation(resultRelationDesc);
        /* make a copy so as not to depend on relcache info not changing... */
        resultRelInfo->ri_TrigDesc = 
CopyTriggerDesc(resultRelationDesc->trigdesc);
        if (resultRelInfo->ri_TrigDesc)
diff --git a/src/backend/executor/execReplication.c 
b/src/backend/executor/execReplication.c
index d0a89cd..f18efdb 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -559,8 +559,12 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
        Relation        rel = resultRelInfo->ri_RelationDesc;
        ItemPointer tid = &(searchslot->tts_tid);
 
-       /* For now we support only tables. */
+       /*
+        * We support only non-system tables, with
+        * check_publication_add_relation() accountable.
+        */
        Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+       Assert(!IsCatalogRelation(rel));
 
        CheckCmdReplicaIdentity(rel, CMD_UPDATE);
 
diff --git a/src/backend/executor/nodeModifyTable.c 
b/src/backend/executor/nodeModifyTable.c
index a2442b7..b70d2f6 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -2320,6 +2320,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo 
*resultRelInfo,
        }
        else
        {
+               ItemPointerData lockedtid;
+
                /*
                 * If we generate a new candidate tuple after EvalPlanQual 
testing, we
                 * must loop back here to try again.  (We don't need to redo 
triggers,
@@ -2328,6 +2330,7 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo 
*resultRelInfo,
                 * to do them again.)
                 */
 redo_act:
+               lockedtid = *tupleid;
                result = ExecUpdateAct(context, resultRelInfo, tupleid, 
oldtuple, slot,
                                                           canSetTag, 
&updateCxt);
 
@@ -2421,6 +2424,14 @@ redo_act:
                                                                
ExecInitUpdateProjection(context->mtstate,
                                                                                
                                 resultRelInfo);
 
+                                                       if 
(resultRelInfo->ri_needLockTagTuple)
+                                                       {
+                                                               
UnlockTuple(resultRelationDesc,
+                                                                               
        &lockedtid, InplaceUpdateTupleLock);
+                                                               
LockTuple(resultRelationDesc,
+                                                                               
  tupleid, InplaceUpdateTupleLock);
+                                                       }
+
                                                        /* Fetch the most 
recent version of old tuple. */
                                                        oldSlot = 
resultRelInfo->ri_oldTupleSlot;
                                                        if 
(!table_tuple_fetch_row_version(resultRelationDesc,
@@ -2525,6 +2536,14 @@ ExecOnConflictUpdate(ModifyTableContext *context,
        TransactionId xmin;
        bool            isnull;
 
+       /*
+        * Parse analysis should have blocked ON CONFLICT for all system
+        * relations, which includes these.  There's no fundamental obstacle to
+        * supporting this; we'd just need to handle LOCKTAG_TUPLE like the 
other
+        * ExecUpdate() caller.
+        */
+       Assert(!resultRelInfo->ri_needLockTagTuple);
+
        /* Determine lock mode to use */
        lockmode = ExecUpdateLockMode(context->estate, resultRelInfo);
 
@@ -2850,6 +2869,7 @@ ExecMergeMatched(ModifyTableContext *context, 
ResultRelInfo *resultRelInfo,
 {
        ModifyTableState *mtstate = context->mtstate;
        List      **mergeActions = resultRelInfo->ri_MergeActions;
+       ItemPointerData lockedtid;
        List       *actionStates;
        TupleTableSlot *newslot = NULL;
        TupleTableSlot *rslot = NULL;
@@ -2886,17 +2906,33 @@ ExecMergeMatched(ModifyTableContext *context, 
ResultRelInfo *resultRelInfo,
         * target wholerow junk attr.
         */
        Assert(tupleid != NULL || oldtuple != NULL);
+       ItemPointerSetInvalid(&lockedtid);
        if (oldtuple != NULL)
        {
                Assert(resultRelInfo->ri_TrigDesc);
+               Assert(!resultRelInfo->ri_needLockTagTuple);
                ExecForceStoreHeapTuple(oldtuple, 
resultRelInfo->ri_oldTupleSlot,
                                                                false);
        }
-       else if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
-                                                                               
        tupleid,
-                                                                               
        SnapshotAny,
-                                                                               
        resultRelInfo->ri_oldTupleSlot))
-               elog(ERROR, "failed to fetch the target tuple");
+       else
+       {
+               if (resultRelInfo->ri_needLockTagTuple)
+               {
+                       /*
+                        * This locks even tuples that don't match 
mas_whenqual, which
+                        * isn't ideal.  MERGE on system catalogs is a minor 
use case, so
+                        * don't bother doing better.
+                        */
+                       LockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                         InplaceUpdateTupleLock);
+                       lockedtid = *tupleid;
+               }
+               if 
(!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
+                                                                               
   tupleid,
+                                                                               
   SnapshotAny,
+                                                                               
   resultRelInfo->ri_oldTupleSlot))
+                       elog(ERROR, "failed to fetch the target tuple");
+       }
 
        /*
         * Test the join condition.  If it's satisfied, perform a MATCHED 
action.
@@ -2968,7 +3004,7 @@ lmerge_matched:
                                                                                
tupleid, NULL, newslot, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
 
                                        break;          /* concurrent 
update/delete */
                                }
@@ -2979,7 +3015,7 @@ lmerge_matched:
                                {
                                        if (!ExecIRUpdateTriggers(estate, 
resultRelInfo,
                                                                                
          oldtuple, newslot))
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
                                }
                                else
                                {
@@ -2999,7 +3035,8 @@ lmerge_matched:
                                        if (updateCxt.crossPartUpdate)
                                        {
                                                mtstate->mt_merge_updated += 1;
-                                               return 
context->cpUpdateReturningSlot;
+                                               rslot = 
context->cpUpdateReturningSlot;
+                                               goto out;
                                        }
                                }
 
@@ -3017,7 +3054,7 @@ lmerge_matched:
                                                                                
NULL, NULL, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
 
                                        break;          /* concurrent 
update/delete */
                                }
@@ -3028,7 +3065,7 @@ lmerge_matched:
                                {
                                        if (!ExecIRDeleteTriggers(estate, 
resultRelInfo,
                                                                                
          oldtuple))
-                                               return NULL;    /* "do nothing" 
*/
+                                               goto out;       /* "do nothing" 
*/
                                }
                                else
                                        result = ExecDeleteAct(context, 
resultRelInfo, tupleid,
@@ -3109,7 +3146,7 @@ lmerge_matched:
                                 * let caller handle it under NOT MATCHED [BY 
TARGET] clauses.
                                 */
                                *matched = false;
-                               return NULL;
+                               goto out;
 
                        case TM_Updated:
                                {
@@ -3183,7 +3220,7 @@ lmerge_matched:
                                                                 * more to do.
                                                                 */
                                                                if 
(TupIsNull(epqslot))
-                                                                       return 
NULL;
+                                                                       goto 
out;
 
                                                                /*
                                                                 * If we got a 
NULL ctid from the subplan, the
@@ -3201,6 +3238,15 @@ lmerge_matched:
                                                                 * we need to 
switch to the NOT MATCHED BY
                                                                 * SOURCE case.
                                                                 */
+                                                               if 
(resultRelInfo->ri_needLockTagTuple)
+                                                               {
+                                                                       if 
(ItemPointerIsValid(&lockedtid))
+                                                                               
UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                                                               
                        InplaceUpdateTupleLock);
+                                                                       
LockTuple(resultRelInfo->ri_RelationDesc, &context->tmfd.ctid,
+                                                                               
          InplaceUpdateTupleLock);
+                                                                       
lockedtid = context->tmfd.ctid;
+                                                               }
                                                                if 
(!table_tuple_fetch_row_version(resultRelationDesc,
                                                                                
                                                   &context->tmfd.ctid,
                                                                                
                                                   SnapshotAny,
@@ -3229,7 +3275,7 @@ lmerge_matched:
                                                         * MATCHED [BY TARGET] 
actions
                                                         */
                                                        *matched = false;
-                                                       return NULL;
+                                                       goto out;
 
                                                case TM_SelfModified:
 
@@ -3257,13 +3303,13 @@ lmerge_matched:
 
                                                        /* This shouldn't 
happen */
                                                        elog(ERROR, "attempted 
to update or delete invisible tuple");
-                                                       return NULL;
+                                                       goto out;
 
                                                default:
                                                        /* see table_tuple_lock 
call in ExecDelete() */
                                                        elog(ERROR, "unexpected 
table_tuple_lock status: %u",
                                                                 result);
-                                                       return NULL;
+                                                       goto out;
                                        }
                                }
 
@@ -3310,6 +3356,10 @@ lmerge_matched:
        /*
         * Successfully executed an action or no qualifying action was found.
         */
+out:
+       if (ItemPointerIsValid(&lockedtid))
+               UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                       InplaceUpdateTupleLock);
        return rslot;
 }
 
@@ -3761,6 +3811,7 @@ ExecModifyTable(PlanState *pstate)
        HeapTupleData oldtupdata;
        HeapTuple       oldtuple;
        ItemPointer tupleid;
+       bool            tuplock;
 
        CHECK_FOR_INTERRUPTS();
 
@@ -4073,6 +4124,8 @@ ExecModifyTable(PlanState *pstate)
                                break;
 
                        case CMD_UPDATE:
+                               tuplock = false;
+
                                /* Initialize projection info if first time for 
this table */
                                if 
(unlikely(!resultRelInfo->ri_projectNewInfoValid))
                                        ExecInitUpdateProjection(node, 
resultRelInfo);
@@ -4084,6 +4137,7 @@ ExecModifyTable(PlanState *pstate)
                                oldSlot = resultRelInfo->ri_oldTupleSlot;
                                if (oldtuple != NULL)
                                {
+                                       
Assert(!resultRelInfo->ri_needLockTagTuple);
                                        /* Use the wholerow junk attr as the 
old tuple. */
                                        ExecForceStoreHeapTuple(oldtuple, 
oldSlot, false);
                                }
@@ -4092,6 +4146,11 @@ ExecModifyTable(PlanState *pstate)
                                        /* Fetch the most recent version of old 
tuple. */
                                        Relation        relation = 
resultRelInfo->ri_RelationDesc;
 
+                                       if (resultRelInfo->ri_needLockTagTuple)
+                                       {
+                                               LockTuple(relation, tupleid, 
InplaceUpdateTupleLock);
+                                               tuplock = true;
+                                       }
                                        if 
(!table_tuple_fetch_row_version(relation, tupleid,
                                                                                
                           SnapshotAny,
                                                                                
                           oldSlot))
@@ -4103,6 +4162,9 @@ ExecModifyTable(PlanState *pstate)
                                /* Now apply the update. */
                                slot = ExecUpdate(&context, resultRelInfo, 
tupleid, oldtuple,
                                                                  slot, 
node->canSetTag);
+                               if (tuplock)
+                                       
UnlockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                                               
InplaceUpdateTupleLock);
                                break;
 
                        case CMD_DELETE:
diff --git a/src/backend/utils/cache/relcache.c 
b/src/backend/utils/cache/relcache.c
index 930cc03..3f1e8ce 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -3770,6 +3770,7 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
 {
        RelFileNumber newrelfilenumber;
        Relation        pg_class;
+       ItemPointerData otid;
        HeapTuple       tuple;
        Form_pg_class classform;
        MultiXactId minmulti = InvalidMultiXactId;
@@ -3812,11 +3813,12 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
         */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID,
-                                                               
ObjectIdGetDatum(RelationGetRelid(relation)));
+       tuple = SearchSysCacheLockedCopy1(RELOID,
+                                                                         
ObjectIdGetDatum(RelationGetRelid(relation)));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for relation %u",
                         RelationGetRelid(relation));
+       otid = tuple->t_self;
        classform = (Form_pg_class) GETSTRUCT(tuple);
 
        /*
@@ -3936,9 +3938,10 @@ RelationSetNewRelfilenumber(Relation relation, char 
persistence)
                classform->relminmxid = minmulti;
                classform->relpersistence = persistence;
 
-               CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_class, &otid, tuple);
        }
 
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tuple);
 
        table_close(pg_class, RowExclusiveLock);
diff --git a/src/backend/utils/cache/syscache.c 
b/src/backend/utils/cache/syscache.c
index 3e03dfc..50c9440 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -30,7 +30,10 @@
 #include "catalog/pg_shseclabel_d.h"
 #include "common/int.h"
 #include "lib/qunique.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
 #include "utils/catcache.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -269,6 +272,98 @@ ReleaseSysCache(HeapTuple tuple)
 }
 
 /*
+ * SearchSysCacheLocked1
+ *
+ * Combine SearchSysCache1() with acquiring a LOCKTAG_TUPLE at mode
+ * InplaceUpdateTupleLock.  This is a tool for complying with the
+ * README.tuplock section "Locking to write inplace-updated tables".  After
+ * the caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock)
+ * and ReleaseSysCache().
+ *
+ * The returned tuple may be the subject of an uncommitted update, so this
+ * doesn't prevent the "tuple concurrently updated" error.
+ */
+HeapTuple
+SearchSysCacheLocked1(int cacheId,
+                                         Datum key1)
+{
+       ItemPointerData tid;
+       LOCKTAG         tag;
+       Oid                     dboid =
+               SysCache[cacheId]->cc_relisshared ? InvalidOid : MyDatabaseId;
+       Oid                     reloid = cacheinfo[cacheId].reloid;
+
+       /*----------
+        * Since inplace updates may happen just before our LockTuple(), we must
+        * return content acquired after LockTuple() of the TID we return.  If 
we
+        * just fetched twice instead of looping, the following sequence would
+        * defeat our locking:
+        *
+        * GRANT:   SearchSysCache1() = TID (1,5)
+        * GRANT:   LockTuple(pg_class, (1,5))
+        * [no more inplace update of (1,5) until we release the lock]
+        * CLUSTER: SearchSysCache1() = TID (1,5)
+        * CLUSTER: heap_update() = TID (1,8)
+        * CLUSTER: COMMIT
+        * GRANT:   SearchSysCache1() = TID (1,8)
+        * GRANT:   return (1,8) from SearchSysCacheLocked1()
+        * VACUUM:  SearchSysCache1() = TID (1,8)
+        * VACUUM:  LockTuple(pg_class, (1,8))  # two TIDs now locked for one 
rel
+        * VACUUM:  inplace update
+        * GRANT:   heap_update() = (1,9)  # lose inplace update
+        *
+        * In the happy case, this takes two fetches, one to determine the TID 
to
+        * lock and another to get the content and confirm the TID didn't 
change.
+        *
+        * This is valid even if the row gets updated to a new TID, the old TID
+        * becomes LP_UNUSED, and the row gets updated back to its old TID.  
We'd
+        * still hold the right LOCKTAG_TUPLE and a copy of the row captured 
after
+        * the LOCKTAG_TUPLE.
+        */
+       ItemPointerSetInvalid(&tid);
+       for (;;)
+       {
+               HeapTuple       tuple;
+               LOCKMODE        lockmode = InplaceUpdateTupleLock;
+
+               tuple = SearchSysCache1(cacheId, key1);
+               if (ItemPointerIsValid(&tid))
+               {
+                       if (!HeapTupleIsValid(tuple))
+                       {
+                               LockRelease(&tag, lockmode, false);
+                               return tuple;
+                       }
+                       if (ItemPointerEquals(&tid, &tuple->t_self))
+                               return tuple;
+                       LockRelease(&tag, lockmode, false);
+               }
+               else if (!HeapTupleIsValid(tuple))
+                       return tuple;
+
+               tid = tuple->t_self;
+               ReleaseSysCache(tuple);
+               /* like: LockTuple(rel, &tid, lockmode) */
+               SET_LOCKTAG_TUPLE(tag, dboid, reloid,
+                                                 
ItemPointerGetBlockNumber(&tid),
+                                                 
ItemPointerGetOffsetNumber(&tid));
+               (void) LockAcquire(&tag, lockmode, false, false);
+
+               /*
+                * If an inplace update just finished, ensure we process the 
syscache
+                * inval.  XXX this is insufficient: the inplace updater may 
not yet
+                * have reached AtEOXact_Inval().  See test at 
inplace-inval.spec.
+                *
+                * If a heap_update() call just released its LOCKTAG_TUPLE, 
we'll
+                * probably find the old tuple and reach "tuple concurrently 
updated".
+                * If that heap_update() aborts, our LOCKTAG_TUPLE blocks 
inplace
+                * updates while our caller works.
+                */
+               AcceptInvalidationMessages();
+       }
+}
+
+/*
  * SearchSysCacheCopy
  *
  * A convenience routine that does SearchSysCache and (if successful)
@@ -295,6 +390,28 @@ SearchSysCacheCopy(int cacheId,
 }
 
 /*
+ * SearchSysCacheLockedCopy1
+ *
+ * Meld SearchSysCacheLockedCopy1 with SearchSysCacheCopy().  After the
+ * caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock) and
+ * heap_freetuple().
+ */
+HeapTuple
+SearchSysCacheLockedCopy1(int cacheId,
+                                                 Datum key1)
+{
+       HeapTuple       tuple,
+                               newtuple;
+
+       tuple = SearchSysCacheLocked1(cacheId, key1);
+       if (!HeapTupleIsValid(tuple))
+               return tuple;
+       newtuple = heap_copytuple(tuple);
+       ReleaseSysCache(tuple);
+       return newtuple;
+}
+
+/*
  * SearchSysCacheExists
  *
  * A convenience routine that just probes to see if a tuple can be found.
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index b62c96f..eab0add 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -482,6 +482,9 @@ typedef struct ResultRelInfo
        /* Have the projection and the slots above been initialized? */
        bool            ri_projectNewInfoValid;
 
+       /* updates do LockTuple() before oldtup read; see README.tuplock */
+       bool            ri_needLockTagTuple;
+
        /* triggers to be fired, if any */
        TriggerDesc *ri_TrigDesc;
 
diff --git a/src/include/storage/lockdefs.h b/src/include/storage/lockdefs.h
index 934ba84..810b297 100644
--- a/src/include/storage/lockdefs.h
+++ b/src/include/storage/lockdefs.h
@@ -47,6 +47,8 @@ typedef int LOCKMODE;
 
 #define MaxLockMode                            8       /* highest standard 
lock mode */
 
+/* See README.tuplock section "Locking to write inplace-updated tables" */
+#define InplaceUpdateTupleLock ExclusiveLock
 
 /* WAL representation of an AccessExclusiveLock on a table */
 typedef struct xl_standby_lock
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index 03a27dd..b541911 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -43,9 +43,14 @@ extern HeapTuple SearchSysCache4(int cacheId,
 
 extern void ReleaseSysCache(HeapTuple tuple);
 
+extern HeapTuple SearchSysCacheLocked1(int cacheId,
+                                                                          
Datum key1);
+
 /* convenience routines */
 extern HeapTuple SearchSysCacheCopy(int cacheId,
                                                                        Datum 
key1, Datum key2, Datum key3, Datum key4);
+extern HeapTuple SearchSysCacheLockedCopy1(int cacheId,
+                                                                               
   Datum key1);
 extern bool SearchSysCacheExists(int cacheId,
                                                                 Datum key1, 
Datum key2, Datum key3, Datum key4);
 extern Oid     GetSysCacheOid(int cacheId, AttrNumber oidcol,
diff --git a/src/test/isolation/expected/intra-grant-inplace.out 
b/src/test/isolation/expected/intra-grant-inplace.out
index c2a9841..b5fe8b0 100644
--- a/src/test/isolation/expected/intra-grant-inplace.out
+++ b/src/test/isolation/expected/intra-grant-inplace.out
@@ -154,9 +154,11 @@ step b1: BEGIN;
 step grant1: 
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
  <waiting ...>
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
-step c2: COMMIT;
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
+step addk2: <... completed>
+ERROR:  deadlock detected
 step grant1: <... completed>
+step c2: COMMIT;
 step c1: COMMIT;
 step read2: 
        SELECT relhasindex FROM pg_class
@@ -194,9 +196,8 @@ relhasindex
 f          
 (1 row)
 
-s4: WARNING:  got: tuple concurrently updated
-step revoke4: <... completed>
 step r3: ROLLBACK;
+step revoke4: <... completed>
 
 starting permutation: b1 drop1 b3 sfu3 revoke4 c1 r3
 step b1: BEGIN;
@@ -223,6 +224,6 @@ relhasindex
 -----------
 (0 rows)
 
-s4: WARNING:  got: tuple concurrently deleted
+s4: WARNING:  got: cache lookup failed for relation REDACTED
 step revoke4: <... completed>
 step r3: ROLLBACK;
diff --git a/src/test/isolation/specs/eval-plan-qual.spec 
b/src/test/isolation/specs/eval-plan-qual.spec
index 3a74406..07307e6 100644
--- a/src/test/isolation/specs/eval-plan-qual.spec
+++ b/src/test/isolation/specs/eval-plan-qual.spec
@@ -194,7 +194,7 @@ step simplepartupdate_noroute {
        update parttbl set b = 2 where c = 1 returning *;
 }
 
-# test system class updates
+# test system class LockTuple()
 
 step sys1      {
        UPDATE pg_class SET reltuples = 123 WHERE oid = 'accounts'::regclass;
diff --git a/src/test/isolation/specs/intra-grant-inplace.spec 
b/src/test/isolation/specs/intra-grant-inplace.spec
index eed0b52..2992c85 100644
--- a/src/test/isolation/specs/intra-grant-inplace.spec
+++ b/src/test/isolation/specs/intra-grant-inplace.spec
@@ -14,6 +14,7 @@ teardown
 
 # heap_update()
 session s1
+setup  { SET deadlock_timeout = '100s'; }
 step b1        { BEGIN; }
 step grant1    {
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
@@ -25,6 +26,7 @@ step c1       { COMMIT; }
 
 # inplace update
 session s2
+setup  { SET deadlock_timeout = '10ms'; }
 step read2     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
@@ -73,8 +75,6 @@ step keyshr5  {
 teardown       { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned future LockTuple()
-
 permutation
        b1
        grant1
@@ -126,8 +126,8 @@ permutation
        b2
        sfnku2
        b1
-       grant1(c2)              # acquire LockTuple(), await sfnku2 xmax
-       addk2                   # block in LockTuple() behind grant1 = deadlock
+       grant1(addk2)   # acquire LockTuple(), await sfnku2 xmax
+       addk2(*)                # block in LockTuple() behind grant1 = deadlock
        c2
        c1
        read2
@@ -138,7 +138,7 @@ permutation
        grant1
        b3
        sfu3(c1)        # acquire LockTuple(), await grant1 xmax
-       revoke4(sfu3)   # block in LockTuple() behind sfu3
+       revoke4(r3)     # block in LockTuple() behind sfu3
        c1
        r3                      # revoke4 unlocks old tuple and finds new
 
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    For inplace update, send nontransactional invalidations.
    
    The inplace update survives ROLLBACK.  The inval didn't, so another
    backend's DDL could then update the row without incorporating the
    inplace update.  In the test this fixes, a mix of CREATE INDEX and ALTER
    TABLE resulted in a table with an index, yet relhasindex=f.  That is a
    source of index corruption.
    
    Core code no longer needs XLOG_INVALIDATIONS, but this leaves removing
    it for future work.  Extensions could be relying on that mechanism, so
    that removal would not be back-patch material.  Back-patch to v12 (all
    supported versions).
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/20240523000548.58.nmi...@google.com

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 797bddf..d7e417f 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -6305,6 +6305,9 @@ heap_inplace_update_finish(void *state, HeapTuple tuple)
        Relation        relation = scan->heap_rel;
        uint32          oldlen;
        uint32          newlen;
+       int                     nmsgs = 0;
+       SharedInvalidationMessage *invalMessages = NULL;
+       bool            RelcacheInitFileInval = false;
 
        Assert(ItemPointerEquals(&oldtup->t_self, &tuple->t_self));
        oldlen = oldtup->t_len - htup->t_hoff;
@@ -6312,6 +6315,29 @@ heap_inplace_update_finish(void *state, HeapTuple tuple)
        if (oldlen != newlen || htup->t_hoff != tuple->t_data->t_hoff)
                elog(ERROR, "wrong tuple length");
 
+       /*
+        * Construct shared cache inval if necessary.  Note that because we only
+        * pass the new version of the tuple, this mustn't be used for any
+        * operations that could change catcache lookup keys.  But we aren't
+        * bothering with index updates either, so that's true a fortiori.
+        */
+       CacheInvalidateHeapTupleInplace(relation, tuple, NULL);
+
+       /* Like RecordTransactionCommit(), log only if needed */
+       if (XLogStandbyInfoActive())
+               nmsgs = inplaceGetInvalidationMessages(&invalMessages,
+                                                                               
           &RelcacheInitFileInval);
+
+       /*
+        * Unlink relcache init files as needed.  If unlinking, acquire
+        * RelCacheInitLock until after associated invalidations.  By doing this
+        * in advance, if we checkpoint and then crash between inplace
+        * XLogInsert() and inval, we don't rely on StartupXLOG() ->
+        * RelationCacheInitFileRemove().  That uses elevel==LOG, so replay 
would
+        * neglect to PANIC on EIO.
+        */
+       PreInplace_Inval();
+
        /* NO EREPORT(ERROR) from here till changes are logged */
        START_CRIT_SECTION();
 
@@ -6341,9 +6367,16 @@ heap_inplace_update_finish(void *state, HeapTuple tuple)
                XLogRecPtr      recptr;
 
                xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
+               xlrec.dbId = MyDatabaseId;
+               xlrec.tsId = MyDatabaseTableSpace;
+               xlrec.relcacheInitFileInval = RelcacheInitFileInval;
+               xlrec.nmsgs = nmsgs;
 
                XLogBeginInsert();
-               XLogRegisterData((char *) &xlrec, SizeOfHeapInplace);
+               XLogRegisterData((char *) &xlrec, MinSizeOfHeapInplace);
+               if (nmsgs != 0)
+                       XLogRegisterData((char *) invalMessages,
+                                                        nmsgs * 
sizeof(SharedInvalidationMessage));
 
                XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
                XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen);
@@ -6355,22 +6388,23 @@ heap_inplace_update_finish(void *state, HeapTuple tuple)
                PageSetLSN(BufferGetPage(buffer), recptr);
        }
 
-       END_CRIT_SECTION();
-
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+       /*
+        * Send invalidations to shared queue.  SearchSysCacheLocked1() assumes 
we
+        * do this before UnlockTuple().
+        *
+        * If we're mutating a tuple visible only to this transaction, there's 
an
+        * equivalent transactional inval from the action that created the 
tuple,
+        * and this inval is superfluous.
+        */
+       AtInplace_Inval();
+
+       END_CRIT_SECTION();
        UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
        systable_endscan(scan);
 
-       /*
-        * Send out shared cache inval if necessary.  Note that because we only
-        * pass the new version of the tuple, this mustn't be used for any
-        * operations that could change catcache lookup keys.  But we aren't
-        * bothering with index updates either, so that's true a fortiori.
-        *
-        * XXX ROLLBACK discards the invalidation.  See test inplace-inval.spec.
-        */
-       if (!IsBootstrapProcessingMode())
-               CacheInvalidateHeapTuple(relation, tuple, NULL);
+       AcceptInvalidationMessages();   /* local processing of just-sent inval 
*/
 }
 
 /*
@@ -10268,6 +10302,12 @@ heap_xlog_inplace(XLogReaderState *record)
        }
        if (BufferIsValid(buffer))
                UnlockReleaseBuffer(buffer);
+
+       ProcessCommittedInvalidationMessages(xlrec->msgs,
+                                                                               
 xlrec->nmsgs,
+                                                                               
 xlrec->relcacheInitFileInval,
+                                                                               
 xlrec->dbId,
+                                                                               
 xlrec->tsId);
 }
 
 void
diff --git a/src/backend/access/rmgrdesc/heapdesc.c 
b/src/backend/access/rmgrdesc/heapdesc.c
index 5f5673e..f31cc3a 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -16,6 +16,7 @@
 
 #include "access/heapam_xlog.h"
 #include "access/rmgrdesc_utils.h"
+#include "storage/standbydefs.h"
 
 /*
  * NOTE: "keyname" argument cannot have trailing spaces or punctuation
@@ -253,6 +254,9 @@ heap_desc(StringInfo buf, XLogReaderState *record)
                xl_heap_inplace *xlrec = (xl_heap_inplace *) rec;
 
                appendStringInfo(buf, "off: %u", xlrec->offnum);
+               standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
+                                                                  xlrec->dbId, 
xlrec->tsId,
+                                                                  
xlrec->relcacheInitFileInval);
        }
 }
 
diff --git a/src/backend/access/rmgrdesc/standbydesc.c 
b/src/backend/access/rmgrdesc/standbydesc.c
index 25f870b..32e509a 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -96,11 +96,7 @@ standby_identify(uint8 info)
        return id;
 }
 
-/*
- * This routine is used by both standby_desc and xact_desc, because
- * transaction commits and XLOG_INVALIDATIONS messages contain invalidations;
- * it seems pointless to duplicate the code.
- */
+/* also used by non-standby records having analogous invalidation fields */
 void
 standby_desc_invalidations(StringInfo buf,
                                                   int nmsgs, 
SharedInvalidationMessage *msgs,
diff --git a/src/backend/access/transam/xact.c 
b/src/backend/access/transam/xact.c
index 9bda1aa..30285bd 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1358,14 +1358,23 @@ RecordTransactionCommit(void)
 
                /*
                 * Transactions without an assigned xid can contain invalidation
-                * messages (e.g. explicit relcache invalidations or catcache
-                * invalidations for inplace updates); standbys need to process 
those.
-                * We can't emit a commit record without an xid, and we don't 
want to
-                * force assigning an xid, because that'd be problematic for 
e.g.
-                * vacuum.  Hence we emit a bespoke record for the 
invalidations. We
-                * don't want to use that in case a commit record is emitted, 
so they
-                * happen synchronously with commits (besides not wanting to 
emit more
-                * WAL records).
+                * messages.  While inplace updates formerly did so, they now 
send
+                * immediate invalidations.  Extensions might still do so, and
+                * standbys may need to process those invals.  We can't emit a 
commit
+                * record without an xid, and we don't want to force assigning 
an xid,
+                * because that'd be problematic for e.g. vacuum.  Hence we 
emit a
+                * bespoke record for the invalidations. We don't want to use 
that in
+                * case a commit record is emitted, so they happen 
synchronously with
+                * commits (besides not wanting to emit more WAL records).
+                *
+                * XXX Every known use of this capability is a defect.  Since 
an XID
+                * isn't controlling visibility of the change that prompted 
invals,
+                * other sessions need the inval even if this transactions 
aborts.
+                *
+                * ON COMMIT DELETE ROWS does a nontransactional index_build(), 
which
+                * queues a relcache inval, including in transactions without 
an xid
+                * that had read the (empty) table.  Standbys don't need any ON 
COMMIT
+                * DELETE ROWS invals, but we've not done the work to withhold 
them.
                 */
                if (nmsgs != 0)
                {
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index b4b68b1..ba576c6 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2884,19 +2884,21 @@ index_update_stats(Relation rel,
        }
 
        /*
-        * If anything changed, write out the tuple
+        * If anything changed, write out the tuple and immediate invals
         */
        if (dirty)
-       {
                heap_inplace_update_finish(state, tuple);
-               /* the above sends a cache inval message */
-       }
        else
-       {
                heap_inplace_update_cancel(state);
-               /* no need to change tuple, but force relcache inval anyway */
-               CacheInvalidateRelcacheByTuple(tuple);
-       }
+
+       /*
+        * Queue a transactional relcache inval.  CREATE INDEX needs an 
immediate
+        * inval for the relhasindex change, but it also needs a transactional
+        * inval for when the new index's rows become visible.  Other CREATE 
INDEX
+        * and REINDEX code happens to also queue a transactional inval, but 
keep
+        * this in case rare callers rely on this part of our API contract.
+        */
+       CacheInvalidateRelcacheByTuple(tuple);
 
        heap_freetuple(tuple);
 
diff --git a/src/backend/commands/event_trigger.c 
b/src/backend/commands/event_trigger.c
index 36d82bd..5d4173a 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -975,11 +975,6 @@ EventTriggerOnLogin(void)
                                 * this instead of regular updates serves two 
purposes. First,
                                 * that avoids possible waiting on the 
row-level lock. Second,
                                 * that avoids dealing with TOAST.
-                                *
-                                * Changes made by inplace update may be lost 
due to
-                                * concurrent normal updates; see 
inplace-inval.spec. However,
-                                * we are OK with that.  The subsequent 
connections will still
-                                * have a chance to set "dathasloginevt" to 
false.
                                 */
                                heap_inplace_update_finish(state, tuple);
                        }
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 8ec5adf..b2cf14e 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -508,23 +508,18 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
 
                        /*
                         * Inplace updates are only ever performed on catalog 
tuples and
-                        * can, per definition, not change tuple visibility.  
Since we
-                        * don't decode catalog tuples, we're not interested in 
the
-                        * record's contents.
+                        * can, per definition, not change tuple visibility.  
Inplace
+                        * updates don't affect storage or interpretation of 
table rows,
+                        * so they don't affect logicalrep_write_tuple() 
outcomes.  Hence,
+                        * we don't process invalidations from the original 
operation.  If
+                        * inplace updates did affect those things, 
invalidations wouldn't
+                        * make it work, since there are no snapshot-specific 
versions of
+                        * inplace-updated values.  Since we also don't decode 
catalog
+                        * tuples, we're not interested in the record's 
contents.
                         *
-                        * In-place updates can be used either by XID-bearing 
transactions
-                        * (e.g.  in CREATE INDEX CONCURRENTLY) or by XID-less
-                        * transactions (e.g.  VACUUM).  In the former case, 
the commit
-                        * record will include cache invalidations, so we mark 
the
-                        * transaction as catalog modifying here. Currently 
that's
-                        * redundant because the commit will do that as well, 
but once we
-                        * support decoding in-progress relations, this will be 
important.
+                        * Older WAL may contain commit-time invals for inplace 
updates.
+                        * Excess invalidation is safe.
                         */
-                       if (!TransactionIdIsValid(xid))
-                               break;
-
-                       (void) SnapBuildProcessChange(builder, xid, 
buf->origptr);
-                       ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, 
buf->origptr);
                        break;
 
                case XLOG_HEAP_CONFIRM:
diff --git a/src/backend/utils/cache/catcache.c 
b/src/backend/utils/cache/catcache.c
index 111d8a2..ea8ca0e 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -2288,7 +2288,8 @@ void
 PrepareToInvalidateCacheTuple(Relation relation,
                                                          HeapTuple tuple,
                                                          HeapTuple newtuple,
-                                                         void (*function) 
(int, uint32, Oid))
+                                                         void (*function) 
(int, uint32, Oid, void *),
+                                                         void *context)
 {
        slist_iter      iter;
        Oid                     reloid;
@@ -2329,7 +2330,7 @@ PrepareToInvalidateCacheTuple(Relation relation,
                hashvalue = CatalogCacheComputeTupleHashValue(ccp, 
ccp->cc_nkeys, tuple);
                dbid = ccp->cc_relisshared ? (Oid) 0 : MyDatabaseId;
 
-               (*function) (ccp->id, hashvalue, dbid);
+               (*function) (ccp->id, hashvalue, dbid, context);
 
                if (newtuple)
                {
@@ -2338,7 +2339,7 @@ PrepareToInvalidateCacheTuple(Relation relation,
                        newhashvalue = CatalogCacheComputeTupleHashValue(ccp, 
ccp->cc_nkeys, newtuple);
 
                        if (newhashvalue != hashvalue)
-                               (*function) (ccp->id, newhashvalue, dbid);
+                               (*function) (ccp->id, newhashvalue, dbid, 
context);
                }
        }
 }
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 603aa41..b3d3adb 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -94,6 +94,10 @@
  *     worth trying to avoid sending such inval traffic in the future, if those
  *     problems can be overcome cheaply.
  *
+ *     When making a nontransactional change to a cacheable object, we must
+ *     likewise send the invalidation immediately, before ending the change's
+ *     critical section.  This includes inplace heap updates, relmap, and smgr.
+ *
  *     When wal_level=logical, write invalidations into WAL at each command 
end to
  *     support the decoding of the in-progress transactions.  See
  *     CommandEndInvalidationMessages.
@@ -130,13 +134,15 @@
 
 /*
  * Pending requests are stored as ready-to-send SharedInvalidationMessages.
- * We keep the messages themselves in arrays in TopTransactionContext
- * (there are separate arrays for catcache and relcache messages).  Control
- * information is kept in a chain of TransInvalidationInfo structs, also
- * allocated in TopTransactionContext.  (We could keep a subtransaction's
- * TransInvalidationInfo in its CurTransactionContext; but that's more
- * wasteful not less so, since in very many scenarios it'd be the only
- * allocation in the subtransaction's CurTransactionContext.)
+ * We keep the messages themselves in arrays in TopTransactionContext (there
+ * are separate arrays for catcache and relcache messages).  For transactional
+ * messages, control information is kept in a chain of TransInvalidationInfo
+ * structs, also allocated in TopTransactionContext.  (We could keep a
+ * subtransaction's TransInvalidationInfo in its CurTransactionContext; but
+ * that's more wasteful not less so, since in very many scenarios it'd be the
+ * only allocation in the subtransaction's CurTransactionContext.)  For
+ * inplace update messages, control information appears in an
+ * InvalidationInfo, allocated in CurrentMemoryContext.
  *
  * We can store the message arrays densely, and yet avoid moving data around
  * within an array, because within any one subtransaction we need only
@@ -147,7 +153,9 @@
  * struct.  Similarly, we need distinguish messages of prior subtransactions
  * from those of the current subtransaction only until the subtransaction
  * completes, after which we adjust the array indexes in the parent's
- * TransInvalidationInfo to include the subtransaction's messages.
+ * TransInvalidationInfo to include the subtransaction's messages.  Inplace
+ * invalidations don't need a concept of command or subtransaction boundaries,
+ * since we send them during the WAL insertion critical section.
  *
  * The ordering of the individual messages within a command's or
  * subtransaction's output is not considered significant, although this
@@ -200,7 +208,7 @@ typedef struct InvalidationMsgsGroup
 
 
 /*----------------
- * Invalidation messages are divided into two groups:
+ * Transactional invalidation messages are divided into two groups:
  *     1) events so far in current command, not yet reflected to caches.
  *     2) events in previous commands of current transaction; these have
  *        been reflected to local caches, and must be either broadcast to
@@ -216,26 +224,36 @@ typedef struct InvalidationMsgsGroup
  *----------------
  */
 
-typedef struct TransInvalidationInfo
+/* fields common to both transactional and inplace invalidation */
+typedef struct InvalidationInfo
 {
-       /* Back link to parent transaction's info */
-       struct TransInvalidationInfo *parent;
-
-       /* Subtransaction nesting depth */
-       int                     my_level;
-
        /* Events emitted by current command */
        InvalidationMsgsGroup CurrentCmdInvalidMsgs;
 
-       /* Events emitted by previous commands of this (sub)transaction */
-       InvalidationMsgsGroup PriorCmdInvalidMsgs;
-
        /* init file must be invalidated? */
        bool            RelcacheInitFileInval;
+} InvalidationInfo;
+
+/* subclass adding fields specific to transactional invalidation */
+typedef struct TransInvalidationInfo
+{
+       /* Base class */
+       struct InvalidationInfo ii;
+
+       /* Events emitted by previous commands of this (sub)transaction */
+       InvalidationMsgsGroup PriorCmdInvalidMsgs;
+
+       /* Back link to parent transaction's info */
+       struct TransInvalidationInfo *parent;
+
+       /* Subtransaction nesting depth */
+       int                     my_level;
 } TransInvalidationInfo;
 
 static TransInvalidationInfo *transInvalInfo = NULL;
 
+static InvalidationInfo *inplaceInvalInfo = NULL;
+
 /* GUC storage */
 int                    debug_discard_caches = 0;
 
@@ -543,9 +561,12 @@ ProcessInvalidationMessagesMulti(InvalidationMsgsGroup 
*group,
 static void
 RegisterCatcacheInvalidation(int cacheId,
                                                         uint32 hashValue,
-                                                        Oid dbId)
+                                                        Oid dbId,
+                                                        void *context)
 {
-       AddCatcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
+       InvalidationInfo *info = (InvalidationInfo *) context;
+
+       AddCatcacheInvalidationMessage(&info->CurrentCmdInvalidMsgs,
                                                                   cacheId, 
hashValue, dbId);
 }
 
@@ -555,10 +576,9 @@ RegisterCatcacheInvalidation(int cacheId,
  * Register an invalidation event for all catcache entries from a catalog.
  */
 static void
-RegisterCatalogInvalidation(Oid dbId, Oid catId)
+RegisterCatalogInvalidation(InvalidationInfo *info, Oid dbId, Oid catId)
 {
-       AddCatalogInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                                                 dbId, catId);
+       AddCatalogInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, 
catId);
 }
 
 /*
@@ -567,10 +587,9 @@ RegisterCatalogInvalidation(Oid dbId, Oid catId)
  * As above, but register a relcache invalidation event.
  */
 static void
-RegisterRelcacheInvalidation(Oid dbId, Oid relId)
+RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 {
-       AddRelcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                                                  dbId, relId);
+       AddRelcacheInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, 
relId);
 
        /*
         * Most of the time, relcache invalidation is associated with system
@@ -587,7 +606,7 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId)
         * as well.  Also zap when we are invalidating whole relcache.
         */
        if (relId == InvalidOid || RelationIdIsInInitFile(relId))
-               transInvalInfo->RelcacheInitFileInval = true;
+               info->RelcacheInitFileInval = true;
 }
 
 /*
@@ -597,24 +616,27 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId)
  * Only needed for catalogs that don't have catcaches.
  */
 static void
-RegisterSnapshotInvalidation(Oid dbId, Oid relId)
+RegisterSnapshotInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 {
-       AddSnapshotInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                                                  dbId, relId);
+       AddSnapshotInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, 
relId);
 }
 
 /*
  * PrepareInvalidationState
  *             Initialize inval data for the current (sub)transaction.
  */
-static void
+static InvalidationInfo *
 PrepareInvalidationState(void)
 {
        TransInvalidationInfo *myInfo;
 
+       Assert(IsTransactionState());
+       /* Can't queue transactional message while collecting inplace messages. 
*/
+       Assert(inplaceInvalInfo == NULL);
+
        if (transInvalInfo != NULL &&
                transInvalInfo->my_level == GetCurrentTransactionNestLevel())
-               return;
+               return (InvalidationInfo *) transInvalInfo;
 
        myInfo = (TransInvalidationInfo *)
                MemoryContextAllocZero(TopTransactionContext,
@@ -637,7 +659,7 @@ PrepareInvalidationState(void)
                 * counter.  This is a convenient place to check for that, as 
well as
                 * being important to keep management of the message arrays 
simple.
                 */
-               if (NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs) 
!= 0)
+               if 
(NumMessagesInGroup(&transInvalInfo->ii.CurrentCmdInvalidMsgs) != 0)
                        elog(ERROR, "cannot start a subtransaction when there 
are unprocessed inval messages");
 
                /*
@@ -646,8 +668,8 @@ PrepareInvalidationState(void)
                 * to update them to follow whatever is already in the arrays.
                 */
                SetGroupToFollow(&myInfo->PriorCmdInvalidMsgs,
-                                                
&transInvalInfo->CurrentCmdInvalidMsgs);
-               SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs,
+                                                
&transInvalInfo->ii.CurrentCmdInvalidMsgs);
+               SetGroupToFollow(&myInfo->ii.CurrentCmdInvalidMsgs,
                                                 &myInfo->PriorCmdInvalidMsgs);
        }
        else
@@ -663,6 +685,41 @@ PrepareInvalidationState(void)
        }
 
        transInvalInfo = myInfo;
+       return (InvalidationInfo *) myInfo;
+}
+
+/*
+ * PrepareInplaceInvalidationState
+ *             Initialize inval data for an inplace update.
+ *
+ * See previous function for more background.
+ */
+static InvalidationInfo *
+PrepareInplaceInvalidationState(void)
+{
+       InvalidationInfo *myInfo;
+
+       Assert(IsTransactionState());
+       /* limit of one inplace update under assembly */
+       Assert(inplaceInvalInfo == NULL);
+
+       /* gone after WAL insertion CritSection ends, so use current context */
+       myInfo = (InvalidationInfo *) palloc0(sizeof(InvalidationInfo));
+
+       /* Stash our messages past end of the transactional messages, if any. */
+       if (transInvalInfo != NULL)
+               SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs,
+                                                
&transInvalInfo->ii.CurrentCmdInvalidMsgs);
+       else
+       {
+               InvalMessageArrays[CatCacheMsgs].msgs = NULL;
+               InvalMessageArrays[CatCacheMsgs].maxmsgs = 0;
+               InvalMessageArrays[RelCacheMsgs].msgs = NULL;
+               InvalMessageArrays[RelCacheMsgs].maxmsgs = 0;
+       }
+
+       inplaceInvalInfo = myInfo;
+       return myInfo;
 }
 
 /* ----------------------------------------------------------------
@@ -902,7 +959,7 @@ 
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
         * after we send the SI messages.  However, we need not do anything 
unless
         * we committed.
         */
-       *RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval;
+       *RelcacheInitFileInval = transInvalInfo->ii.RelcacheInitFileInval;
 
        /*
         * Collect all the pending messages into a single contiguous array of
@@ -913,7 +970,7 @@ 
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
         * not new ones.
         */
        nummsgs = NumMessagesInGroup(&transInvalInfo->PriorCmdInvalidMsgs) +
-               NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs);
+               NumMessagesInGroup(&transInvalInfo->ii.CurrentCmdInvalidMsgs);
 
        *msgs = msgarray = (SharedInvalidationMessage *)
                MemoryContextAlloc(CurTransactionContext,
@@ -926,7 +983,7 @@ 
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
                                                                                
msgs,
                                                                                
n * sizeof(SharedInvalidationMessage)),
                                                                 nmsgs += n));
-       ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+       ProcessMessageSubGroupMulti(&transInvalInfo->ii.CurrentCmdInvalidMsgs,
                                                                CatCacheMsgs,
                                                                
(memcpy(msgarray + nmsgs,
                                                                                
msgs,
@@ -938,7 +995,51 @@ 
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
                                                                                
msgs,
                                                                                
n * sizeof(SharedInvalidationMessage)),
                                                                 nmsgs += n));
-       ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+       ProcessMessageSubGroupMulti(&transInvalInfo->ii.CurrentCmdInvalidMsgs,
+                                                               RelCacheMsgs,
+                                                               
(memcpy(msgarray + nmsgs,
+                                                                               
msgs,
+                                                                               
n * sizeof(SharedInvalidationMessage)),
+                                                                nmsgs += n));
+       Assert(nmsgs == nummsgs);
+
+       return nmsgs;
+}
+
+/*
+ * inplaceGetInvalidationMessages() is called by the inplace update to collect
+ * invalidation messages to add to its WAL record.  Like the previous
+ * function, we might still fail.
+ */
+int
+inplaceGetInvalidationMessages(SharedInvalidationMessage **msgs,
+                                                          bool 
*RelcacheInitFileInval)
+{
+       SharedInvalidationMessage *msgarray;
+       int                     nummsgs;
+       int                     nmsgs;
+
+       /* Quick exit if we haven't done anything with invalidation messages. */
+       if (inplaceInvalInfo == NULL)
+       {
+               *RelcacheInitFileInval = false;
+               *msgs = NULL;
+               return 0;
+       }
+
+       *RelcacheInitFileInval = inplaceInvalInfo->RelcacheInitFileInval;
+       nummsgs = NumMessagesInGroup(&inplaceInvalInfo->CurrentCmdInvalidMsgs);
+       *msgs = msgarray = (SharedInvalidationMessage *)
+               palloc(nummsgs * sizeof(SharedInvalidationMessage));
+
+       nmsgs = 0;
+       ProcessMessageSubGroupMulti(&inplaceInvalInfo->CurrentCmdInvalidMsgs,
+                                                               CatCacheMsgs,
+                                                               
(memcpy(msgarray + nmsgs,
+                                                                               
msgs,
+                                                                               
n * sizeof(SharedInvalidationMessage)),
+                                                                nmsgs += n));
+       ProcessMessageSubGroupMulti(&inplaceInvalInfo->CurrentCmdInvalidMsgs,
                                                                RelCacheMsgs,
                                                                
(memcpy(msgarray + nmsgs,
                                                                                
msgs,
@@ -1038,16 +1139,16 @@ AtEOXact_Inval(bool isCommit)
                 * after we send the SI messages.  However, we need not do 
anything
                 * unless we committed.
                 */
-               if (transInvalInfo->RelcacheInitFileInval)
+               if (transInvalInfo->ii.RelcacheInitFileInval)
                        RelationCacheInitFilePreInvalidate();
 
                AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
-                                                                  
&transInvalInfo->CurrentCmdInvalidMsgs);
+                                                                  
&transInvalInfo->ii.CurrentCmdInvalidMsgs);
 
                
ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
                                                                                
 SendSharedInvalidMessages);
 
-               if (transInvalInfo->RelcacheInitFileInval)
+               if (transInvalInfo->ii.RelcacheInitFileInval)
                        RelationCacheInitFilePostInvalidate();
        }
        else
@@ -1058,6 +1159,45 @@ AtEOXact_Inval(bool isCommit)
 
        /* Need not free anything explicitly */
        transInvalInfo = NULL;
+       inplaceInvalInfo = NULL;
+}
+
+/*
+ * PreInplace_Inval
+ *             Process queued-up invalidation before inplace update critical 
section.
+ *
+ * Tasks belong here if they are safe even if the inplace update does not
+ * complete.  Currently, this just unlinks a cache file, which can fail.  The
+ * sum of this and AtInplace_Inval() mirrors AtEOXact_Inval(isCommit=true).
+ */
+void
+PreInplace_Inval(void)
+{
+       Assert(CritSectionCount == 0);
+
+       if (inplaceInvalInfo && inplaceInvalInfo->RelcacheInitFileInval)
+               RelationCacheInitFilePreInvalidate();
+}
+
+/*
+ * AtInplace_Inval
+ *             Process queued-up invalidations after inplace update buffer 
mutation.
+ */
+void
+AtInplace_Inval(void)
+{
+       Assert(CritSectionCount > 0);
+
+       if (inplaceInvalInfo == NULL)
+               return;
+
+       
ProcessInvalidationMessagesMulti(&inplaceInvalInfo->CurrentCmdInvalidMsgs,
+                                                                        
SendSharedInvalidMessages);
+
+       if (inplaceInvalInfo->RelcacheInitFileInval)
+               RelationCacheInitFilePostInvalidate();
+
+       inplaceInvalInfo = NULL;
 }
 
 /*
@@ -1125,18 +1265,21 @@ AtEOSubXact_Inval(bool isCommit)
                                                                   
&myInfo->PriorCmdInvalidMsgs);
 
                /* Must readjust parent's CurrentCmdInvalidMsgs indexes now */
-               SetGroupToFollow(&myInfo->parent->CurrentCmdInvalidMsgs,
+               SetGroupToFollow(&myInfo->parent->ii.CurrentCmdInvalidMsgs,
                                                 
&myInfo->parent->PriorCmdInvalidMsgs);
 
                /* Pending relcache inval becomes parent's problem too */
-               if (myInfo->RelcacheInitFileInval)
-                       myInfo->parent->RelcacheInitFileInval = true;
+               if (myInfo->ii.RelcacheInitFileInval)
+                       myInfo->parent->ii.RelcacheInitFileInval = true;
 
                /* Pop the transaction state stack */
                transInvalInfo = myInfo->parent;
 
                /* Need not free anything else explicitly */
                pfree(myInfo);
+
+               /* Successful inplace update must clear this. */
+               Assert(inplaceInvalInfo == NULL);
        }
        else
        {
@@ -1148,6 +1291,9 @@ AtEOSubXact_Inval(bool isCommit)
 
                /* Need not free anything else explicitly */
                pfree(myInfo);
+
+               /* Reset from aborted inplace update. */
+               inplaceInvalInfo = NULL;
        }
 }
 
@@ -1177,7 +1323,7 @@ CommandEndInvalidationMessages(void)
        if (transInvalInfo == NULL)
                return;
 
-       ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
+       ProcessInvalidationMessages(&transInvalInfo->ii.CurrentCmdInvalidMsgs,
                                                                
LocalExecuteInvalidationMessage);
 
        /* WAL Log per-command invalidation messages for wal_level=logical */
@@ -1185,26 +1331,21 @@ CommandEndInvalidationMessages(void)
                LogLogicalInvalidations();
 
        AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
-                                                          
&transInvalInfo->CurrentCmdInvalidMsgs);
+                                                          
&transInvalInfo->ii.CurrentCmdInvalidMsgs);
 }
 
 
 /*
- * CacheInvalidateHeapTuple
- *             Register the given tuple for invalidation at end of command
- *             (ie, current command is creating or outdating this tuple).
- *             Also, detect whether a relcache invalidation is implied.
- *
- * For an insert or delete, tuple is the target tuple and newtuple is NULL.
- * For an update, we are called just once, with tuple being the old tuple
- * version and newtuple the new version.  This allows avoidance of duplicate
- * effort during an update.
+ * CacheInvalidateHeapTupleCommon
+ *             Common logic for end-of-command and inplace variants.
  */
-void
-CacheInvalidateHeapTuple(Relation relation,
-                                                HeapTuple tuple,
-                                                HeapTuple newtuple)
+static void
+CacheInvalidateHeapTupleCommon(Relation relation,
+                                                          HeapTuple tuple,
+                                                          HeapTuple newtuple,
+                                                          InvalidationInfo 
*(*prepare_callback) (void))
 {
+       InvalidationInfo *info;
        Oid                     tupleRelId;
        Oid                     databaseId;
        Oid                     relationId;
@@ -1228,11 +1369,8 @@ CacheInvalidateHeapTuple(Relation relation,
        if (IsToastRelation(relation))
                return;
 
-       /*
-        * If we're not prepared to queue invalidation messages for this
-        * subtransaction level, get ready now.
-        */
-       PrepareInvalidationState();
+       /* Allocate any required resources. */
+       info = prepare_callback();
 
        /*
         * First let the catcache do its thing
@@ -1241,11 +1379,12 @@ CacheInvalidateHeapTuple(Relation relation,
        if (RelationInvalidatesSnapshotsOnly(tupleRelId))
        {
                databaseId = IsSharedRelation(tupleRelId) ? InvalidOid : 
MyDatabaseId;
-               RegisterSnapshotInvalidation(databaseId, tupleRelId);
+               RegisterSnapshotInvalidation(info, databaseId, tupleRelId);
        }
        else
                PrepareToInvalidateCacheTuple(relation, tuple, newtuple,
-                                                                         
RegisterCatcacheInvalidation);
+                                                                         
RegisterCatcacheInvalidation,
+                                                                         (void 
*) info);
 
        /*
         * Now, is this tuple one of the primary definers of a relcache entry? 
See
@@ -1318,7 +1457,44 @@ CacheInvalidateHeapTuple(Relation relation,
        /*
         * Yes.  We need to register a relcache invalidation event.
         */
-       RegisterRelcacheInvalidation(databaseId, relationId);
+       RegisterRelcacheInvalidation(info, databaseId, relationId);
+}
+
+/*
+ * CacheInvalidateHeapTuple
+ *             Register the given tuple for invalidation at end of command
+ *             (ie, current command is creating or outdating this tuple) and 
end of
+ *             transaction.  Also, detect whether a relcache invalidation is 
implied.
+ *
+ * For an insert or delete, tuple is the target tuple and newtuple is NULL.
+ * For an update, we are called just once, with tuple being the old tuple
+ * version and newtuple the new version.  This allows avoidance of duplicate
+ * effort during an update.
+ */
+void
+CacheInvalidateHeapTuple(Relation relation,
+                                                HeapTuple tuple,
+                                                HeapTuple newtuple)
+{
+       CacheInvalidateHeapTupleCommon(relation, tuple, newtuple,
+                                                                  
PrepareInvalidationState);
+}
+
+/*
+ * CacheInvalidateHeapTupleInplace
+ *             Register the given tuple for nontransactional invalidation 
pertaining
+ *             to an inplace update.  Also, detect whether a relcache 
invalidation is
+ *             implied.
+ *
+ * Like CacheInvalidateHeapTuple(), but for inplace updates.
+ */
+void
+CacheInvalidateHeapTupleInplace(Relation relation,
+                                                               HeapTuple tuple,
+                                                               HeapTuple 
newtuple)
+{
+       CacheInvalidateHeapTupleCommon(relation, tuple, newtuple,
+                                                                  
PrepareInplaceInvalidationState);
 }
 
 /*
@@ -1337,14 +1513,13 @@ CacheInvalidateCatalog(Oid catalogId)
 {
        Oid                     databaseId;
 
-       PrepareInvalidationState();
-
        if (IsSharedRelation(catalogId))
                databaseId = InvalidOid;
        else
                databaseId = MyDatabaseId;
 
-       RegisterCatalogInvalidation(databaseId, catalogId);
+       RegisterCatalogInvalidation(PrepareInvalidationState(),
+                                                               databaseId, 
catalogId);
 }
 
 /*
@@ -1362,15 +1537,14 @@ CacheInvalidateRelcache(Relation relation)
        Oid                     databaseId;
        Oid                     relationId;
 
-       PrepareInvalidationState();
-
        relationId = RelationGetRelid(relation);
        if (relation->rd_rel->relisshared)
                databaseId = InvalidOid;
        else
                databaseId = MyDatabaseId;
 
-       RegisterRelcacheInvalidation(databaseId, relationId);
+       RegisterRelcacheInvalidation(PrepareInvalidationState(),
+                                                                databaseId, 
relationId);
 }
 
 /*
@@ -1383,9 +1557,8 @@ CacheInvalidateRelcache(Relation relation)
 void
 CacheInvalidateRelcacheAll(void)
 {
-       PrepareInvalidationState();
-
-       RegisterRelcacheInvalidation(InvalidOid, InvalidOid);
+       RegisterRelcacheInvalidation(PrepareInvalidationState(),
+                                                                InvalidOid, 
InvalidOid);
 }
 
 /*
@@ -1399,14 +1572,13 @@ CacheInvalidateRelcacheByTuple(HeapTuple classTuple)
        Oid                     databaseId;
        Oid                     relationId;
 
-       PrepareInvalidationState();
-
        relationId = classtup->oid;
        if (classtup->relisshared)
                databaseId = InvalidOid;
        else
                databaseId = MyDatabaseId;
-       RegisterRelcacheInvalidation(databaseId, relationId);
+       RegisterRelcacheInvalidation(PrepareInvalidationState(),
+                                                                databaseId, 
relationId);
 }
 
 /*
@@ -1420,8 +1592,6 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 {
        HeapTuple       tup;
 
-       PrepareInvalidationState();
-
        tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tup))
                elog(ERROR, "cache lookup failed for relation %u", relid);
@@ -1611,7 +1781,7 @@ LogLogicalInvalidations(void)
        if (transInvalInfo == NULL)
                return;
 
-       group = &transInvalInfo->CurrentCmdInvalidMsgs;
+       group = &transInvalInfo->ii.CurrentCmdInvalidMsgs;
        nmsgs = NumMessagesInGroup(group);
 
        if (nmsgs > 0)
diff --git a/src/backend/utils/cache/syscache.c 
b/src/backend/utils/cache/syscache.c
index 50c9440..f41b1c2 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -351,8 +351,7 @@ SearchSysCacheLocked1(int cacheId,
 
                /*
                 * If an inplace update just finished, ensure we process the 
syscache
-                * inval.  XXX this is insufficient: the inplace updater may 
not yet
-                * have reached AtEOXact_Inval().  See test at 
inplace-inval.spec.
+                * inval.
                 *
                 * If a heap_update() call just released its LOCKTAG_TUPLE, 
we'll
                 * probably find the old tuple and reach "tuple concurrently 
updated".
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 42736f3..4591e9a 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -20,6 +20,7 @@
 #include "storage/buf.h"
 #include "storage/bufpage.h"
 #include "storage/relfilelocator.h"
+#include "storage/sinval.h"
 #include "utils/relcache.h"
 
 
@@ -425,9 +426,14 @@ typedef struct xl_heap_confirm
 typedef struct xl_heap_inplace
 {
        OffsetNumber offnum;            /* updated tuple's offset on page */
+       Oid                     dbId;                   /* MyDatabaseId */
+       Oid                     tsId;                   /* MyDatabaseTableSpace 
*/
+       bool            relcacheInitFileInval;  /* invalidate relcache init 
files */
+       int                     nmsgs;                  /* number of shared 
inval msgs */
+       SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
 } xl_heap_inplace;
 
-#define SizeOfHeapInplace      (offsetof(xl_heap_inplace, offnum) + 
sizeof(OffsetNumber))
+#define MinSizeOfHeapInplace   (offsetof(xl_heap_inplace, nmsgs) + sizeof(int))
 
 /*
  * This is what we need to know about setting a visibility map bit
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 8f5744b..c812237 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -144,6 +144,8 @@ extern void ProcessCatchupInterrupt(void);
 
 extern int     xactGetCommittedInvalidationMessages(SharedInvalidationMessage 
**msgs,
                                                                                
                 bool *RelcacheInitFileInval);
+extern int     inplaceGetInvalidationMessages(SharedInvalidationMessage **msgs,
+                                                                               
   bool *RelcacheInitFileInval);
 extern void ProcessCommittedInvalidationMessages(SharedInvalidationMessage 
*msgs,
                                                                                
                 int nmsgs, bool RelcacheInitFileInval,
                                                                                
                 Oid dbid, Oid tsid);
diff --git a/src/include/utils/catcache.h b/src/include/utils/catcache.h
index 3fb9647..8f04bb8 100644
--- a/src/include/utils/catcache.h
+++ b/src/include/utils/catcache.h
@@ -225,6 +225,7 @@ extern void CatCacheInvalidate(CatCache *cache, uint32 
hashValue);
 extern void PrepareToInvalidateCacheTuple(Relation relation,
                                                                                
  HeapTuple tuple,
                                                                                
  HeapTuple newtuple,
-                                                                               
  void (*function) (int, uint32, Oid));
+                                                                               
  void (*function) (int, uint32, Oid, void *),
+                                                                               
  void *context);
 
 #endif                                                 /* CATCACHE_H */
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 24695fa..3390e7a 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -28,6 +28,9 @@ extern void AcceptInvalidationMessages(void);
 
 extern void AtEOXact_Inval(bool isCommit);
 
+extern void PreInplace_Inval(void);
+extern void AtInplace_Inval(void);
+
 extern void AtEOSubXact_Inval(bool isCommit);
 
 extern void PostPrepare_Inval(void);
@@ -37,6 +40,9 @@ extern void CommandEndInvalidationMessages(void);
 extern void CacheInvalidateHeapTuple(Relation relation,
                                                                         
HeapTuple tuple,
                                                                         
HeapTuple newtuple);
+extern void CacheInvalidateHeapTupleInplace(Relation relation,
+                                                                               
        HeapTuple tuple,
+                                                                               
        HeapTuple newtuple);
 
 extern void CacheInvalidateCatalog(Oid catalogId);
 
diff --git a/src/test/isolation/expected/inplace-inval.out 
b/src/test/isolation/expected/inplace-inval.out
index e68eca5..c35895a 100644
--- a/src/test/isolation/expected/inplace-inval.out
+++ b/src/test/isolation/expected/inplace-inval.out
@@ -1,6 +1,6 @@
 Parsed test spec with 3 sessions
 
-starting permutation: cachefill3 cir1 cic2 ddl3
+starting permutation: cachefill3 cir1 cic2 ddl3 read1
 step cachefill3: TABLE newly_indexed;
 c
 -
@@ -9,6 +9,14 @@ c
 step cir1: BEGIN; CREATE INDEX i1 ON newly_indexed (c); ROLLBACK;
 step cic2: CREATE INDEX i2 ON newly_indexed (c);
 step ddl3: ALTER TABLE newly_indexed ADD extra int;
+step read1: 
+       SELECT relhasindex FROM pg_class WHERE oid = 'newly_indexed'::regclass;
+
+relhasindex
+-----------
+t          
+(1 row)
+
 
 starting permutation: cir1 cic2 ddl3 read1
 step cir1: BEGIN; CREATE INDEX i1 ON newly_indexed (c); ROLLBACK;
diff --git a/src/test/isolation/specs/inplace-inval.spec 
b/src/test/isolation/specs/inplace-inval.spec
index 96954fd..b99112d 100644
--- a/src/test/isolation/specs/inplace-inval.spec
+++ b/src/test/isolation/specs/inplace-inval.spec
@@ -1,7 +1,7 @@
-# If a heap_update() caller retrieves its oldtup from a cache, it's possible
-# for that cache entry to predate an inplace update, causing loss of that
-# inplace update.  This arises because the transaction may abort before
-# sending the inplace invalidation message to the shared queue.
+# An inplace update had been able to abort before sending the inplace
+# invalidation message to the shared queue.  If a heap_update() caller then
+# retrieved its oldtup from a cache, the heap_update() could revert the
+# inplace update.
 
 setup
 {
@@ -27,14 +27,12 @@ step cachefill3     { TABLE newly_indexed; }
 step ddl3              { ALTER TABLE newly_indexed ADD extra int; }
 
 
-# XXX shows an extant bug.  Adding step read1 at the end would usually print
-# relhasindex=f (not wanted).  This does not reach the unwanted behavior under
-# -DCATCACHE_FORCE_RELEASE and friends.
 permutation
        cachefill3      # populates the pg_class row in the catcache
        cir1    # sets relhasindex=true; rollback discards cache inval
        cic2    # sees relhasindex=true, skips changing it (so no inval)
        ddl3    # cached row as the oldtup of an update, losing relhasindex
+       read1   # observe damage
 
 # without cachefill3, no bug
 permutation cir1 cic2 ddl3 read1
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 82b3b41..c3c5d97 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1252,6 +1252,7 @@ Interval
 IntervalAggState
 IntoClause
 InvalMessageArray
+InvalidationInfo
 InvalidationMsgsGroup
 IpcMemoryId
 IpcMemoryKey

Reply via email to