From 0359b2a0db7a195e797851b56c380a61ac549ecc Mon Sep 17 00:00:00 2001
From: Junwang Zhao <zhjwpku@gmail.com>
Date: Thu, 19 Feb 2026 18:06:34 +0900
Subject: [PATCH v3 1/2] Add fast path for foreign key constraint checks

Add a fast-path optimization for foreign key checks that bypasses SPI
by directly probing the unique index on the referenced table.

The fast path applies when the referenced table is not partitioned and
the constraint does not involve temporal semantics.  ri_FastPathCheck()
extracts the FK values, builds scan keys, performs an index scan, and
locks the matching tuple with LockTupleKeyShare via ri_LockPKTuple(),
which handles the RI-specific subset of table_tuple_lock() results.

If the locked tuple was reached by chasing an update chain
(tmfd.traversed), recheck_matched_pk_tuple() verifies that the key
is still the same, emulating EvalPlanQual.

For REPEATABLE READ / SERIALIZABLE, a second index probe with
GetTransactionSnapshot() replicates the SPI path's crosscheck_snapshot
behavior: a PK row visible to the latest snapshot but not to the
transaction snapshot is rejected.

The ri_CheckPermissions() function performs schema USAGE and table
SELECT checks, matching what the SPI path does implicitly.

ri_HashCompareOp() is adjusted to handle cross-type equality operators
(e.g. int48eq for int4 PK / int8 FK) which can appear in conpfeqop.
The original code asserted same-type operators only.

Per-key metadata (compare entries, operator procedures, strategy
numbers) is cached in RI_ConstraintInfo via
ri_populate_fastpath_metadata() on first use, eliminating repeated
calls to ri_HashCompareOp() and get_op_opfamily_properties().
conindid and pk_is_partitioned are also cached at constraint load
time, avoiding per-invocation syscache lookups and the need to open
pk_rel before deciding whether the fast path applies.

Author: Junwang Zhao <zhjwpku@gmail.com>
Author: Amit Langote <amitlangote09@gmail.com>
Discussion: https://postgr.es/m/
---
 src/backend/utils/adt/ri_triggers.c           | 465 +++++++++++++++++-
 .../expected/fk-concurrent-pk-upd.out         |  58 +++
 src/test/isolation/isolation_schedule         |   1 +
 .../isolation/specs/fk-concurrent-pk-upd.spec |  42 ++
 src/test/regress/expected/foreign_key.out     |  47 ++
 src/test/regress/sql/foreign_key.sql          |  64 +++
 6 files changed, 665 insertions(+), 12 deletions(-)
 create mode 100644 src/test/isolation/expected/fk-concurrent-pk-upd.out
 create mode 100644 src/test/isolation/specs/fk-concurrent-pk-upd.spec

diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index d22b8ef7f3c..45cc742fa19 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -24,12 +24,15 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/skey.h"
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/index.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
+#include "catalog/pg_namespace.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
@@ -91,6 +94,7 @@
 #define RI_TRIGTYPE_UPDATE 2
 #define RI_TRIGTYPE_DELETE 3
 
+struct RI_CompareHashEntry;
 
 /*
  * RI_ConstraintInfo
@@ -132,6 +136,16 @@ typedef struct RI_ConstraintInfo
 	Oid			period_intersect_oper;	/* anyrange * anyrange (or
 										 * multiranges) */
 	dlist_node	valid_link;		/* Link in list of valid entries */
+
+	Oid			conindid;
+	bool		pk_is_partitioned;
+
+	/* Fast-path metadata for RI checks on foreign tables */
+	bool		fpmeta_valid;	/* is fast-path metadata valid? */
+	struct RI_CompareHashEntry *compare_entries[RI_MAX_NUMKEYS];
+	RegProcedure regops[RI_MAX_NUMKEYS];
+	Oid			subtypes[RI_MAX_NUMKEYS];
+	int			strats[RI_MAX_NUMKEYS];
 } RI_ConstraintInfo;
 
 /*
@@ -233,6 +247,19 @@ static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 							TupleTableSlot *oldslot, TupleTableSlot *newslot,
 							bool is_restrict,
 							bool detectNewRows, int expect_OK);
+static bool ri_FastPathCheck(const RI_ConstraintInfo *riinfo,
+							 Relation fk_rel, TupleTableSlot *newslot);
+static bool ri_LockPKTuple(Relation pk_rel, TupleTableSlot *slot, Snapshot snap,
+						   bool *concurrently_updated);
+static bool ri_fastpath_is_applicable(const RI_ConstraintInfo *riinfo);
+static void ri_CheckPermissions(Relation query_rel);
+static bool recheck_matched_pk_tuple(Relation idxrel, ScanKeyData *skeys,
+									 TupleTableSlot *new_slot);
+static void build_index_scankeys(const RI_ConstraintInfo *riinfo,
+								 Relation idx_rel, Datum *pk_vals,
+								 char *pk_nulls, ScanKey skeys);
+static void ri_populate_fastpath_metadata(RI_ConstraintInfo *riinfo,
+										  Relation fk_rel, Relation idx_rel);
 static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 							 const RI_ConstraintInfo *riinfo, bool rel_is_pk,
 							 Datum *vals, char *nulls);
@@ -276,14 +303,7 @@ RI_FKey_check(TriggerData *trigdata)
 	if (!table_tuple_satisfies_snapshot(trigdata->tg_relation, newslot, SnapshotSelf))
 		return PointerGetDatum(NULL);
 
-	/*
-	 * Get the relation descriptors of the FK and PK tables.
-	 *
-	 * pk_rel is opened in RowShareLock mode since that's what our eventual
-	 * SELECT FOR KEY SHARE will get on it.
-	 */
 	fk_rel = trigdata->tg_relation;
-	pk_rel = table_open(riinfo->pk_relid, RowShareLock);
 
 	switch (ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false))
 	{
@@ -293,7 +313,6 @@ RI_FKey_check(TriggerData *trigdata)
 			 * No further check needed - an all-NULL key passes every type of
 			 * foreign key constraint.
 			 */
-			table_close(pk_rel, RowShareLock);
 			return PointerGetDatum(NULL);
 
 		case RI_KEYS_SOME_NULL:
@@ -318,7 +337,6 @@ RI_FKey_check(TriggerData *trigdata)
 							 errdetail("MATCH FULL does not allow mixing of null and nonnull key values."),
 							 errtableconstraint(fk_rel,
 												NameStr(riinfo->conname))));
-					table_close(pk_rel, RowShareLock);
 					return PointerGetDatum(NULL);
 
 				case FKCONSTR_MATCH_SIMPLE:
@@ -327,7 +345,6 @@ RI_FKey_check(TriggerData *trigdata)
 					 * MATCH SIMPLE - if ANY column is null, the key passes
 					 * the constraint.
 					 */
-					table_close(pk_rel, RowShareLock);
 					return PointerGetDatum(NULL);
 
 #ifdef NOT_USED
@@ -352,8 +369,42 @@ RI_FKey_check(TriggerData *trigdata)
 			break;
 	}
 
+	/*
+	 * Fast path: probe the PK unique index directly, bypassing SPI.
+	 *
+	 * Note: pk_rel is NOT opened above.  ri_fastpath_is_applicable() uses
+	 * cached metadata (pk_is_partitioned) rather than an open Relation, and
+	 * ri_FastPathCheck() opens it internally.
+	 */
+	if (ri_fastpath_is_applicable(riinfo))
+	{
+		bool		found = ri_FastPathCheck(riinfo, fk_rel, newslot);
+
+		if (found)
+			return PointerGetDatum(NULL);
+
+		/*
+		 * ri_FastPathCheck opens pk_rel internally; we need it for
+		 * ri_ReportViolation.  Re-open briefly.
+		 */
+		pk_rel = table_open(riinfo->pk_relid, RowShareLock);
+		ri_ReportViolation(riinfo, pk_rel, fk_rel,
+						   newslot, NULL,
+						   RI_PLAN_CHECK_LOOKUPPK, false, false);
+	}
+
+	/* Fall back to SPI */
+	elog(DEBUG1, "RI fastpath: constraint \"%s\" falling back to SPI",
+		 NameStr(riinfo->conname));
+
 	SPI_connect();
 
+	/*
+	 * pk_rel is opened in RowShareLock mode since that's what our eventual
+	 * SELECT FOR KEY SHARE will get on it.
+	 */
+	pk_rel = table_open(riinfo->pk_relid, RowShareLock);
+
 	/* Fetch or prepare a saved plan for the real check */
 	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
 
@@ -2355,6 +2406,11 @@ ri_LoadConstraintInfo(Oid constraintOid)
 	dclist_push_tail(&ri_constraint_cache_valid_list, &riinfo->valid_link);
 
 	riinfo->valid = true;
+	riinfo->fpmeta_valid = false;
+
+	riinfo->conindid = conForm->conindid;
+	riinfo->pk_is_partitioned =
+		(get_rel_relkind(riinfo->pk_relid) == RELKIND_PARTITIONED_TABLE);
 
 	return riinfo;
 }
@@ -2617,6 +2673,383 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 	return SPI_processed != 0;
 }
 
+/*
+ * ri_FastPathCheck
+ *		Perform FK existence check via direct index probe, bypassing SPI.
+ *
+ * Returns true if the PK row was found (constraint satisfied).
+ * Returns false if no matching PK row exists; caller should call
+ * ri_ReportViolation().
+ */
+static bool
+ri_FastPathCheck(const RI_ConstraintInfo *riinfo,
+				 Relation fk_rel, TupleTableSlot *newslot)
+{
+	Relation	pk_rel;
+	Relation	idx_rel;
+	IndexScanDesc scandesc;
+	TupleTableSlot *slot;
+	Datum		pk_vals[INDEX_MAX_KEYS];
+	char		pk_nulls[INDEX_MAX_KEYS];
+	ScanKeyData skey[INDEX_MAX_KEYS];
+	bool		found = false;
+	Oid			saved_userid;
+	int			saved_sec_context;
+	Snapshot	snapshot;
+
+	/*
+	 * Advance the command counter so the snapshot sees the effects of prior
+	 * triggers in this statement.  Mirrors what the SPI path does in
+	 * ri_PerformCheck().
+	 */
+	CommandCounterIncrement();
+	snapshot = RegisterSnapshot(GetLatestSnapshot());
+
+	pk_rel = table_open(riinfo->pk_relid, RowShareLock);
+	idx_rel = index_open(riinfo->conindid, AccessShareLock);
+
+	slot = table_slot_create(pk_rel, NULL);
+	scandesc = index_beginscan(pk_rel, idx_rel,
+							   snapshot, NULL,
+							   riinfo->nkeys, 0);
+
+	if (!riinfo->fpmeta_valid)
+		ri_populate_fastpath_metadata((RI_ConstraintInfo *) riinfo,
+									  fk_rel, idx_rel);
+	Assert(riinfo->fpmeta_valid);
+
+	GetUserIdAndSecContext(&saved_userid, &saved_sec_context);
+	SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
+						   saved_sec_context |
+						   SECURITY_LOCAL_USERID_CHANGE |
+						   SECURITY_NOFORCE_RLS);
+	ri_CheckPermissions(pk_rel);
+
+	ri_ExtractValues(fk_rel, newslot, riinfo, false, pk_vals, pk_nulls);
+	build_index_scankeys(riinfo, idx_rel, pk_vals, pk_nulls, skey);
+
+	index_rescan(scandesc, skey, riinfo->nkeys, NULL, 0);
+
+	if (index_getnext_slot(scandesc, ForwardScanDirection, slot))
+	{
+		bool		concurrently_updated;
+
+		if (ri_LockPKTuple(pk_rel, slot, snapshot,
+						   &concurrently_updated))
+		{
+			if (concurrently_updated)
+				found = recheck_matched_pk_tuple(idx_rel, skey, slot);
+			else
+				found = true;
+		}
+	}
+
+	/*--------
+	 * Crosscheck for REPEATABLE READ / SERIALIZABLE:
+	 *
+	 * The latest snapshot can see PK rows committed after our transaction
+	 * started. But the FK check must only succeed if the key also exists in a
+	 * version visible to our transaction snapshot. We can't just do
+	 * table_tuple_satisfies_snapshot on the locked tuple, because a non-key
+	 * update creates a new version invisible to our snapshot even though the
+	 * key hasn't changed.
+	 *
+	 * Instead, do a second index probe with the transaction snapshot. This
+	 * correctly handles both cases:
+	 *	- Newly inserted PK row: not found -> reject
+	 *	- Non-key update of existing row: old version found -> accept
+	 *
+	 * This matches the crosscheck_snapshot behavior in the SPI path's
+	 * ri_PerformCheck().
+	 */
+	if (found && IsolationUsesXactSnapshot())
+	{
+		IndexScanDesc xact_scan;
+		TupleTableSlot *xact_slot;
+		Snapshot	xact_snap = GetTransactionSnapshot();
+
+		xact_slot = table_slot_create(pk_rel, NULL);
+		xact_scan = index_beginscan(pk_rel,
+									idx_rel,
+									xact_snap, NULL,
+									riinfo->nkeys, 0);
+		index_rescan(xact_scan, skey, riinfo->nkeys, NULL, 0);
+
+		if (!index_getnext_slot(xact_scan, ForwardScanDirection, xact_slot))
+			found = false;
+
+		index_endscan(xact_scan);
+		ExecDropSingleTupleTableSlot(xact_slot);
+	}
+
+	index_endscan(scandesc);
+	index_close(idx_rel, NoLock);
+	table_close(pk_rel, NoLock);
+	ExecDropSingleTupleTableSlot(slot);
+
+	UnregisterSnapshot(snapshot);
+
+	SetUserIdAndSecContext(saved_userid, saved_sec_context);
+
+	return found;
+}
+
+/*
+ * ri_LockPKTuple
+ *		Lock a PK tuple found by the fast-path index scan.
+ *
+ * Calls table_tuple_lock() directly with handling specific to RI checks.
+ * Returns true if the tuple was successfully locked.
+ *
+ * Sets *concurrently_updated to true if the locked tuple was reached
+ * by following an update chain (tmfd.traversed), indicating the caller
+ * should recheck the key.
+ */
+static bool
+ri_LockPKTuple(Relation pk_rel, TupleTableSlot *slot, Snapshot snap,
+			   bool *concurrently_updated)
+{
+	TM_FailureData tmfd;
+	TM_Result	result;
+	int			lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+
+	*concurrently_updated = false;
+
+	if (!IsolationUsesXactSnapshot())
+		lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+	result = table_tuple_lock(pk_rel, &slot->tts_tid, snap,
+							  slot, GetCurrentCommandId(false),
+							  LockTupleKeyShare, LockWaitBlock,
+							  lockflags, &tmfd);
+
+	switch (result)
+	{
+		case TM_Ok:
+			if (tmfd.traversed)
+				*concurrently_updated = true;
+			return true;
+
+		case TM_Deleted:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			return false;
+
+		case TM_Updated:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+
+			/*
+			 * In READ COMMITTED, FIND_LAST_VERSION should have chased the
+			 * chain and returned TM_Ok.  Getting here means something
+			 * unexpected -- fall through to error.
+			 */
+			elog(ERROR, "unexpected table_tuple_lock status: %u", result);
+			break;
+
+		case TM_SelfModified:
+
+			/*
+			 * The current command or a later command in this transaction
+			 * modified the PK row.  This shouldn't normally happen during an
+			 * FK check (we're not modifying pk_rel), but handle it safely by
+			 * treating the tuple as not found.
+			 */
+			return false;
+
+		case TM_Invisible:
+			elog(ERROR, "attempted to lock invisible tuple");
+			break;
+
+		default:
+			elog(ERROR, "unrecognized table_tuple_lock status: %u", result);
+			break;
+	}
+
+	return false;				/* keep compiler quiet */
+}
+
+static bool
+ri_fastpath_is_applicable(const RI_ConstraintInfo *riinfo)
+{
+	/*
+	 * Partitioned referenced tables are skipped for simplicity, since they
+	 * require routing the probe through the correct partition using
+	 * PartitionDirectory.
+	 */
+	if (riinfo->pk_is_partitioned)
+		return false;
+
+	/*
+	 * Temporal foreign keys use range overlap and containment semantics (&&,
+	 * <@, range_agg()) that inherently involve aggregation and multiple-row
+	 * reasoning, so they stay on the SPI path.
+	 */
+	if (riinfo->hasperiod)
+		return false;
+
+	return true;
+}
+
+/*
+ * ri_CheckPermissions
+ *   Check that the new user has permissions to look into the schema of
+ *   and SELECT from 'query_rel'
+ *
+ * Provided for non-SQL implementors of an RI_Plan.
+ */
+static void
+ri_CheckPermissions(Relation query_rel)
+{
+	AclResult	aclresult;
+
+	/* USAGE on schema. */
+	aclresult = object_aclcheck(NamespaceRelationId,
+								RelationGetNamespace(query_rel),
+								GetUserId(), ACL_USAGE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_SCHEMA,
+					   get_namespace_name(RelationGetNamespace(query_rel)));
+
+	/* SELECT on relation. */
+	aclresult = pg_class_aclcheck(RelationGetRelid(query_rel), GetUserId(),
+								  ACL_SELECT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_TABLE,
+					   RelationGetRelationName(query_rel));
+}
+
+/*
+ * This checks that the index key of the tuple specified in 'new_slot' matches
+ * the key that has already been found in the PK index relation 'idxrel'.
+ *
+ * Returns true if the index key of the tuple matches the existing index
+ * key, false otherwise.
+ */
+static bool
+recheck_matched_pk_tuple(Relation idxrel, ScanKeyData *skeys,
+						 TupleTableSlot *new_slot)
+{
+	IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	bool		matched = true;
+
+	/* PK indexes never have these. */
+	Assert(indexInfo->ii_Expressions == NIL &&
+		   indexInfo->ii_ExclusionOps == NULL);
+
+	/* Form the index values and isnull flags given the table tuple. */
+	FormIndexDatum(indexInfo, new_slot, NULL, values, isnull);
+	for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		ScanKeyData *skey = &skeys[i];
+
+		/* A PK column can never be set to NULL. */
+		Assert(!isnull[i]);
+		if (!DatumGetBool(FunctionCall2Coll(&skey->sk_func,
+											skey->sk_collation,
+											values[i],
+											skey->sk_argument)))
+		{
+			matched = false;
+			break;
+		}
+	}
+
+	return matched;
+}
+
+/*
+ * build_index_scankeys
+ *		Build ScanKeys for a direct index probe of the PK's unique index.
+ *
+ * Uses cached compare entries, operator procedures, and strategy numbers
+ * from ri_populate_fastpath_metadata() rather than looking them up on
+ * each invocation.  Casts FK values to the operator's expected input
+ * type if needed.
+ */
+static void
+build_index_scankeys(const RI_ConstraintInfo *riinfo,
+					 Relation idx_rel, Datum *pk_vals,
+					 char *pk_nulls, ScanKey skeys)
+{
+	/*
+	 * May need to cast each of the individual values of the foreign key to
+	 * the corresponding PK column's type if the equality operator demands it.
+	 */
+	for (int i = 0; i < riinfo->nkeys; i++)
+	{
+		if (pk_nulls[i] != 'n')
+		{
+			RI_CompareHashEntry *entry = riinfo->compare_entries[i];
+
+			if (OidIsValid(entry->cast_func_finfo.fn_oid))
+				pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
+										   pk_vals[i],
+										   Int32GetDatum(-1),	/* typmod */
+										   BoolGetDatum(false));	/* implicit coercion */
+		}
+		else
+		{
+			Assert(false);
+		}
+	}
+
+	/*
+	 * Set up ScanKeys for the index scan. This is essentially how
+	 * ExecIndexBuildScanKeys() sets them up.
+	 */
+	for (int i = 0; i < riinfo->nkeys; i++)
+	{
+		int			pkattrno = i + 1;
+
+		ScanKeyEntryInitialize(&skeys[i], 0, pkattrno,
+							   riinfo->strats[i], riinfo->subtypes[i],
+							   idx_rel->rd_indcollation[i], riinfo->regops[i],
+							   pk_vals[i]);
+	}
+}
+
+/*
+ * ri_populate_fastpath_metadata
+ *		Cache per-key metadata needed by build_index_scankeys().
+ *
+ * Looks up the compare hash entry, operator procedure OID, and index
+ * strategy/subtype for each key column.  Called lazily on first use
+ * and persists for the lifetime of the RI_ConstraintInfo entry.
+ */
+static void
+ri_populate_fastpath_metadata(RI_ConstraintInfo *riinfo,
+							  Relation fk_rel, Relation idx_rel)
+{
+	Assert(riinfo != NULL && riinfo->valid);
+
+	for (int i = 0; i < riinfo->nkeys; i++)
+	{
+		Oid			eq_opr = riinfo->pf_eq_oprs[i];
+		Oid			typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+		Oid			lefttype;
+		RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+		riinfo->compare_entries[i] = entry;
+		riinfo->regops[i] = get_opcode(eq_opr);
+
+		get_op_opfamily_properties(eq_opr,
+								   idx_rel->rd_opfamily[i],
+								   false,
+								   &riinfo->strats[i],
+								   &lefttype,
+								   &riinfo->subtypes[i]);
+	}
+
+	riinfo->fpmeta_valid = true;
+}
+
 /*
  * Extract fields from a tuple into Datum/nulls arrays
  */
@@ -3169,8 +3602,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
 		 * moment since that will never be generated for implicit coercions.
 		 */
 		op_input_types(eq_opr, &lefttype, &righttype);
-		Assert(lefttype == righttype);
-		if (typeid == lefttype)
+
+		/*
+		 * Don't need to cast if the FK column type already matches what the
+		 * operator expects.  For same-type operators, that's the common type.
+		 * For cross-type operators (e.g. int48eq for int4 PK / int8 FK), the
+		 * FK value is the right operand, so skip the cast if typeid matches
+		 * righttype.
+		 */
+		if ((lefttype == righttype && typeid == lefttype) ||
+			(lefttype != righttype && typeid == righttype))
 			castfunc = InvalidOid;	/* simplest case */
 		else
 		{
diff --git a/src/test/isolation/expected/fk-concurrent-pk-upd.out b/src/test/isolation/expected/fk-concurrent-pk-upd.out
new file mode 100644
index 00000000000..9bbec638ac9
--- /dev/null
+++ b/src/test/isolation/expected/fk-concurrent-pk-upd.out
@@ -0,0 +1,58 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s2ukey s1i s2c s1c s2s s1s
+step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
+step s2c: COMMIT;
+step s1i: <... completed>
+ERROR:  insert or update on table "child" violates foreign key constraint "child_parent_key_fkey"
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         2|foo
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+(0 rows)
+
+
+starting permutation: s2uaux s1i s2c s1c s2s s1s
+step s2uaux: UPDATE parent SET aux = 'bar' WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1);
+step s2c: COMMIT;
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         1|bar
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+        1|         1
+(1 row)
+
+
+starting permutation: s2ukey s1i s2ukey2 s2c s1c s2s s1s
+step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
+step s2ukey2: UPDATE parent SET parent_key = 1 WHERE parent_key = 2;
+step s2c: COMMIT;
+step s1i: <... completed>
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         1|foo
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+        1|         1
+(1 row)
+
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 4e466580cd4..c1a999bf1d2 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -37,6 +37,7 @@ test: fk-partitioned-2
 test: fk-snapshot
 test: fk-snapshot-2
 test: fk-snapshot-3
+test: fk-concurrent-pk-upd
 test: subxid-overflow
 test: eval-plan-qual
 test: eval-plan-qual-trigger
diff --git a/src/test/isolation/specs/fk-concurrent-pk-upd.spec b/src/test/isolation/specs/fk-concurrent-pk-upd.spec
new file mode 100644
index 00000000000..d59a14d4de1
--- /dev/null
+++ b/src/test/isolation/specs/fk-concurrent-pk-upd.spec
@@ -0,0 +1,42 @@
+# Tests that an INSERT on referencing table correctly fails when
+# the referenced value disappears due to a concurrent update
+setup
+{
+  CREATE TABLE parent (
+    parent_key int PRIMARY KEY,
+    aux   text NOT NULL
+  );
+
+  CREATE TABLE child (
+    child_key int PRIMARY KEY,
+    parent_key int8 NOT NULL REFERENCES parent
+  );
+
+  INSERT INTO parent VALUES (1, 'foo');
+}
+
+teardown
+{
+  DROP TABLE parent, child;
+}
+
+session s1
+setup  { BEGIN; }
+step s1i { INSERT INTO child VALUES (1, 1); }
+step s1c { COMMIT; }
+step s1s { SELECT * FROM child; }
+
+session s2
+setup  { BEGIN; }
+step s2ukey { UPDATE parent SET parent_key = 2 WHERE parent_key = 1; }
+step s2uaux { UPDATE parent SET aux = 'bar' WHERE parent_key = 1; }
+step s2ukey2 { UPDATE parent SET parent_key = 1 WHERE parent_key = 2; }
+step s2c { COMMIT; }
+step s2s { SELECT * FROM parent; }
+
+# fail
+permutation s2ukey s1i s2c s1c s2s s1s
+# ok
+permutation s2uaux s1i s2c s1c s2s s1s
+# ok
+permutation s2ukey s1i s2ukey2 s2c s1c s2s s1s
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 9ae4dbf1b0a..0826f518004 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -370,6 +370,53 @@ SELECT * FROM PKTABLE;
 DROP TABLE FKTABLE;
 DROP TABLE PKTABLE;
 --
+-- Check RLS
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+-- Grant privileges on PKTABLE/FKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+GRANT SELECT, INSERT ON FKTABLE TO regress_foreign_key_user;
+-- Enable RLS on PKTABLE and Create policies
+ALTER TABLE PKTABLE ENABLE ROW LEVEL SECURITY;
+CREATE POLICY pktable_view_odd_policy ON PKTABLE TO regress_foreign_key_user USING (ptest1 % 2 = 1);
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+SET ROLE regress_foreign_key_user;
+INSERT INTO FKTABLE VALUES (3, 5);
+INSERT INTO FKTABLE VALUES (2, 5); -- success, REFERENCES are not subject to row security
+RESET ROLE;
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+DROP USER regress_foreign_key_user;
+--
+-- Check ACL
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+-- Grant usage on PKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+-- Inserting into FKTABLE should work
+INSERT INTO FKTABLE VALUES (3, 5);
+-- Revoke usage on PKTABLE from user regress_foreign_key_user
+REVOKE SELECT ON PKTABLE FROM regress_foreign_key_user;
+-- Inserting into FKTABLE should fail
+INSERT INTO FKTABLE VALUES (2, 6);
+ERROR:  permission denied for table pktable
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+DROP USER regress_foreign_key_user;
+--
 -- Check initial check upon ALTER TABLE
 --
 CREATE TABLE PKTABLE ( ptest1 int, ptest2 int, PRIMARY KEY(ptest1, ptest2) );
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 3b8c95bf893..e9ee29331cb 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -242,6 +242,70 @@ SELECT * FROM PKTABLE;
 DROP TABLE FKTABLE;
 DROP TABLE PKTABLE;
 
+--
+-- Check RLS
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+
+-- Grant privileges on PKTABLE/FKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+GRANT SELECT, INSERT ON FKTABLE TO regress_foreign_key_user;
+
+-- Enable RLS on PKTABLE and Create policies
+ALTER TABLE PKTABLE ENABLE ROW LEVEL SECURITY;
+CREATE POLICY pktable_view_odd_policy ON PKTABLE TO regress_foreign_key_user USING (ptest1 % 2 = 1);
+
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+
+SET ROLE regress_foreign_key_user;
+
+INSERT INTO FKTABLE VALUES (3, 5);
+INSERT INTO FKTABLE VALUES (2, 5); -- success, REFERENCES are not subject to row security
+
+RESET ROLE;
+
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+DROP USER regress_foreign_key_user;
+
+--
+-- Check ACL
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+
+-- Grant usage on PKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+
+-- Inserting into FKTABLE should work
+INSERT INTO FKTABLE VALUES (3, 5);
+
+-- Revoke usage on PKTABLE from user regress_foreign_key_user
+REVOKE SELECT ON PKTABLE FROM regress_foreign_key_user;
+
+-- Inserting into FKTABLE should fail
+INSERT INTO FKTABLE VALUES (2, 6);
+
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+
+DROP USER regress_foreign_key_user;
+
 --
 -- Check initial check upon ALTER TABLE
 --
-- 
2.47.3

