From a42c95360d7aeacd652c33d93f3c4d5a2789acfb Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Tue, 12 Jan 2021 14:17:31 +0900
Subject: [PATCH v3 2/2] Avoid using an SQL query for some RI checks

For RI triggers that want to check if a given referenced value exists
in the referenced relation, it suffices to simply scan the foreign key
constraint's unique index, instead of issuing an SQL query to do the
same thing.

To do so, this commit builds on the RIPlan infrastructure added in the
previous commit.  It replaces ri_SqlStringPlanCreate() used in
RI_FKey_check() and ri_Check_Pk_Match() for creating the plan for their
respective checks by ri_LookupKeyInPkRelPlanCreate(), which installs
ri_LookupKeyInPkRel() as the plan to implement those checks.
ri_LookupKeyInPkRel() contains the logic to directly scan the unique
key associated with the foreign key constraint.

This rewrite allows to fix a PK row visibility bug caused by a
partition descriptor hack which requires ActiveSnapshot to be set to
the latest snapshot for find_inheritance_children_extended() to
interpret any detach-pending partitions correctly for RI queries
running under REPEATABLE READ isolation.  With the previous SQL
string implementation of those RI queries, the latest snapshot set
for that hack would also get used, inadvertently, by the scan of the
user table.  With ri_LookupKeyInPkRel(), the snapshot needed for the
hack is now set only for the duration of the code stanza to retrieve
the partition descriptor and thus doesn't affect the PK index scan's
result.  The buggy output in src/test/isolation/expected/fk-snapshot.out
of the relevant test case that was added by 00cb86e75d has been
changed to the correct output.
---
 src/backend/executor/execPartition.c        | 160 ++++++-
 src/backend/executor/nodeLockRows.c         | 160 ++++---
 src/backend/utils/adt/ri_triggers.c         | 464 +++++++++++++++-----
 src/include/executor/execPartition.h        |   6 +
 src/include/executor/executor.h             |   9 +
 src/test/isolation/expected/fk-snapshot.out |   4 +-
 src/test/isolation/specs/fk-snapshot.spec   |   5 +-
 7 files changed, 615 insertions(+), 193 deletions(-)

diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index eb49106102..a68d5d7eb3 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -176,8 +176,9 @@ static void FormPartitionKeyDatum(PartitionDispatch pd,
 								  EState *estate,
 								  Datum *values,
 								  bool *isnull);
-static int	get_partition_for_tuple(PartitionDispatch pd, Datum *values,
-									bool *isnull);
+static int get_partition_for_tuple(PartitionKey key,
+						PartitionDesc partdesc,
+						Datum *values, bool *isnull);
 static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
 												  Datum *values,
 												  bool *isnull,
@@ -318,7 +319,9 @@ ExecFindPartition(ModifyTableState *mtstate,
 		 * these values, error out.
 		 */
 		if (partdesc->nparts == 0 ||
-			(partidx = get_partition_for_tuple(dispatch, values, isnull)) < 0)
+			(partidx = get_partition_for_tuple(dispatch->key,
+											   dispatch->partdesc,
+											   values, isnull)) < 0)
 		{
 			char	   *val_desc;
 
@@ -1380,12 +1383,12 @@ FormPartitionKeyDatum(PartitionDispatch pd,
  * found or -1 if none found.
  */
 static int
-get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
+get_partition_for_tuple(PartitionKey key,
+						PartitionDesc partdesc,
+						Datum *values, bool *isnull)
 {
 	int			bound_offset = -1;
 	int			part_index = -1;
-	PartitionKey key = pd->key;
-	PartitionDesc partdesc = pd->partdesc;
 	PartitionBoundInfo boundinfo = partdesc->boundinfo;
 
 	/*
@@ -1592,6 +1595,151 @@ get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
 	return part_index;
 }
 
+/*
+ * ExecGetLeafPartitionForTuple
+ *		Finds the leaf partition of a partitioned table 'root_rel' that might
+ *		contain the specified key tuple containing a subset of the table's
+ *		columns (including all of the partition key columns)
+ *
+ * 'key_natts' specifies the number columns contained in the key,
+ * 'key_attnums' their attribute numbers as defined in 'root_rel', and
+ * 'key_vals' and 'key_nulls' specify the key tuple.
+ *
+ * Returns NULL if no leaf partition is found for the key.  Caller must close
+ * the relation.
+ *
+ * This works because the unique key defined on the root relation is required
+ * to contain the partition key columns of all of the ancestors that lead up to
+ * a given leaf partition.
+ */
+Relation
+ExecGetLeafPartitionForKey(Relation root_rel, int key_natts,
+						   const AttrNumber *key_attnums,
+						   Datum *key_vals, char *key_nulls,
+						   Oid root_idxoid, int lockmode,
+						   Oid *leaf_idxoid)
+{
+	Relation	rel = root_rel;
+	Oid			constr_idxoid = root_idxoid;
+
+	*leaf_idxoid = InvalidOid;
+
+	/*
+	 * Descend through partitioned parents to find the leaf partition that
+	 * would accept a row with the provided key values, starting with the root
+	 * parent.
+	 */
+	while (true)
+	{
+		PartitionKey partkey = RelationGetPartitionKey(rel);
+		PartitionDirectory partdir;
+		PartitionDesc partdesc;
+		Datum	partkey_vals[PARTITION_MAX_KEYS];
+		bool	partkey_isnull[PARTITION_MAX_KEYS];
+		AttrNumber *root_partattrs = partkey->partattrs;
+		int		i,
+				j;
+		int		partidx;
+		Oid		partoid;
+		bool	is_leaf;
+
+		/*
+		 * Collect partition key values from the unique key.
+		 *
+		 * Because we only have the root table's copy of pk_attnums, must map
+		 * any non-root table's partition key attribute numbers to the root
+		 * table's.
+		 */
+		if (rel != root_rel)
+		{
+			/*
+			 * map->attnums will contain root table attribute numbers for each
+			 * attribute of the current partitioned relation.
+			 */
+			AttrMap *map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
+														RelationGetDescr(rel));
+
+			if (map)
+			{
+				root_partattrs = palloc(partkey->partnatts *
+										sizeof(AttrNumber));
+				for (i = 0; i < partkey->partnatts; i++)
+				{
+					AttrNumber	partattno = partkey->partattrs[i];
+
+					root_partattrs[i] = map->attnums[partattno - 1];
+				}
+
+				free_attrmap(map);
+			}
+		}
+
+		/*
+		 * Referenced key specification does not allow expressions, so there
+		 * would not be expressions in the partition keys either.
+		 */
+		Assert(partkey->partexprs == NIL);
+		for (i = 0, j = 0; i < partkey->partnatts; i++)
+		{
+			int		k;
+
+			for (k = 0; k < key_natts; k++)
+			{
+				if (root_partattrs[i] == key_attnums[k])
+				{
+					partkey_vals[j] = key_vals[k];
+					partkey_isnull[j] = (key_nulls[k] == 'n');
+					j++;
+					break;
+				}
+			}
+		}
+		/* Had better have found values for all of the partition keys. */
+		Assert(j == partkey->partnatts);
+
+		if (root_partattrs != partkey->partattrs)
+			pfree(root_partattrs);
+
+		/* Get the PartitionDesc using the partition directory machinery.  */
+		partdir = CreatePartitionDirectory(CurrentMemoryContext, true);
+		partdesc = PartitionDirectoryLookup(partdir, rel);
+
+		/* Find the partition for the key. */
+		partidx = get_partition_for_tuple(partkey, partdesc,
+										  partkey_vals, partkey_isnull);
+		Assert(partidx < 0 || partidx < partdesc->nparts);
+
+		/* Done using the partition directory. */
+		DestroyPartitionDirectory(partdir);
+
+		/* Close any intermediate parents we opened, but keep the lock. */
+		if (rel != root_rel)
+			table_close(rel, NoLock);
+
+		/* No partition found. */
+		if (partidx < 0)
+			return NULL;
+
+		partoid = partdesc->oids[partidx];
+		rel = table_open(partoid, lockmode);
+		constr_idxoid = index_get_partition(rel, constr_idxoid);
+
+		/*
+		 * Return if the partition is a leaf, else find its partition in the
+		 * next iteration.
+		 */
+		is_leaf = partdesc->is_leaf[partidx];
+		if (is_leaf)
+		{
+			*leaf_idxoid = constr_idxoid;
+			return rel;
+		}
+	}
+
+	Assert(false);
+	return NULL;
+}
+
 /*
  * ExecBuildSlotPartitionKeyDescription
  *
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index a74813c7aa..352cacd70b 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -79,10 +79,7 @@ lnext:
 		Datum		datum;
 		bool		isNull;
 		ItemPointerData tid;
-		TM_FailureData tmfd;
 		LockTupleMode lockmode;
-		int			lockflags = 0;
-		TM_Result	test;
 		TupleTableSlot *markSlot;
 
 		/* clear any leftover test tuple for this rel */
@@ -179,74 +176,11 @@ lnext:
 				break;
 		}
 
-		lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
-		if (!IsolationUsesXactSnapshot())
-			lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
-
-		test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
-								markSlot, estate->es_output_cid,
-								lockmode, erm->waitPolicy,
-								lockflags,
-								&tmfd);
-
-		switch (test)
-		{
-			case TM_WouldBlock:
-				/* couldn't lock tuple in SKIP LOCKED mode */
-				goto lnext;
-
-			case TM_SelfModified:
-
-				/*
-				 * The target tuple was already updated or deleted by the
-				 * current command, or by a later command in the current
-				 * transaction.  We *must* ignore the tuple in the former
-				 * case, so as to avoid the "Halloween problem" of repeated
-				 * update attempts.  In the latter case it might be sensible
-				 * to fetch the updated tuple instead, but doing so would
-				 * require changing heap_update and heap_delete to not
-				 * complain about updating "invisible" tuples, which seems
-				 * pretty scary (table_tuple_lock will not complain, but few
-				 * callers expect TM_Invisible, and we're not one of them). So
-				 * for now, treat the tuple as deleted and do not process.
-				 */
-				goto lnext;
-
-			case TM_Ok:
-
-				/*
-				 * Got the lock successfully, the locked tuple saved in
-				 * markSlot for, if needed, EvalPlanQual testing below.
-				 */
-				if (tmfd.traversed)
-					epq_needed = true;
-				break;
-
-			case TM_Updated:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				elog(ERROR, "unexpected table_tuple_lock status: %u",
-					 test);
-				break;
-
-			case TM_Deleted:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				/* tuple was deleted so don't return it */
-				goto lnext;
-
-			case TM_Invisible:
-				elog(ERROR, "attempted to lock invisible tuple");
-				break;
-
-			default:
-				elog(ERROR, "unrecognized table_tuple_lock status: %u",
-					 test);
-		}
+		/* skip tuple if it couldn't be locked */
+		if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
+								estate->es_snapshot, estate->es_output_cid,
+								lockmode, erm->waitPolicy, &epq_needed))
+			goto lnext;
 
 		/* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
 		erm->curCtid = tid;
@@ -281,6 +215,90 @@ lnext:
 	return slot;
 }
 
+/*
+ * ExecLockTableTuple
+ * 		Locks tuple with the specified TID in lockmode following given wait
+ * 		policy
+ *
+ * Returns true if the tuple was successfully locked.  Locked tuple is loaded
+ * into provided slot.
+ */
+bool
+ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *epq_needed)
+{
+	TM_FailureData tmfd;
+	int			lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+	TM_Result	test;
+
+	if (!IsolationUsesXactSnapshot())
+		lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+	test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
+							waitPolicy, lockflags, &tmfd);
+
+	switch (test)
+	{
+		case TM_WouldBlock:
+			/* couldn't lock tuple in SKIP LOCKED mode */
+			return false;
+
+		case TM_SelfModified:
+			/*
+			 * The target tuple was already updated or deleted by the
+			 * current command, or by a later command in the current
+			 * transaction.  We *must* ignore the tuple in the former
+			 * case, so as to avoid the "Halloween problem" of repeated
+			 * update attempts.  In the latter case it might be sensible
+			 * to fetch the updated tuple instead, but doing so would
+			 * require changing heap_update and heap_delete to not
+			 * complain about updating "invisible" tuples, which seems
+			 * pretty scary (table_tuple_lock will not complain, but few
+			 * callers expect TM_Invisible, and we're not one of them). So
+			 * for now, treat the tuple as deleted and do not process.
+			 */
+			return false;
+
+		case TM_Ok:
+			/*
+			 * Got the lock successfully, the locked tuple saved in
+			 * slot for EvalPlanQual, if asked by the caller.
+			 */
+			if (tmfd.traversed && epq_needed)
+				*epq_needed = true;
+			break;
+
+		case TM_Updated:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			elog(ERROR, "unexpected table_tuple_lock status: %u",
+				 test);
+			break;
+
+		case TM_Deleted:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			/* tuple was deleted so don't return it */
+			return false;
+
+		case TM_Invisible:
+			elog(ERROR, "attempted to lock invisible tuple");
+			return false;
+
+		default:
+			elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
+			return false;
+	}
+
+	return true;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitLockRows
  *
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 46e26dae52..401b17e283 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -23,22 +23,27 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/skey.h"
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/partition.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "commands/trigger.h"
+#include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_relation.h"
+#include "partitioning/partdesc.h"
 #include "storage/bufmgr.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
@@ -50,6 +55,7 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/ruleutils.h"
@@ -151,6 +157,12 @@ typedef void (*RI_PlanFreeFunc_type) (struct RI_Plan *plan);
  */
 typedef struct RI_Plan
 {
+	/* Constraint for this plan. */
+	const RI_ConstraintInfo *riinfo;
+
+	/* RI query type code. */
+	int				constr_queryno;
+
 	/*
 	 * Context under which this struct and its subsidiary data gets allocated.
 	 * It is made a child of CacheMemoryContext.
@@ -265,7 +277,8 @@ static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger,
 													   Relation trig_rel, bool rel_is_pk);
 static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid);
 static Oid	get_ri_constraint_root(Oid constrOid);
-static RI_Plan *ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+static RI_Plan *ri_PlanCheck(const RI_ConstraintInfo *riinfo,
+							 RI_PlanCreateFunc_type plan_create_func,
 							 const char *querystr, int nargs, Oid *argtypes,
 							 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel);
 static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
@@ -289,6 +302,15 @@ static int ri_SqlStringPlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_r
 						Snapshot crosscheck_snapshot,
 						int limit, CmdType *last_stmt_cmdtype);
 static void ri_SqlStringPlanFree(RI_Plan *plan);
+static void ri_LookupKeyInPkRelPlanCreate(RI_Plan *plan,
+							  const char *querystr, int nargs, Oid *paramtypes);
+static int ri_LookupKeyInPkRel(struct RI_Plan *plan,
+					Relation fk_rel, Relation pk_rel,
+					Datum *pk_vals, char *pk_nulls,
+					Snapshot test_snapshot, Snapshot crosscheck_snapshot,
+					int limit, CmdType *last_stmt_cmdtype);
+static bool ri_LookupKeyInPkRelPlanIsValid(RI_Plan *plan);
+static void ri_LookupKeyInPkRelPlanFree(RI_Plan *plan);
 
 
 /*
@@ -384,9 +406,9 @@ RI_FKey_check(TriggerData *trigdata)
 
 					/*
 					 * MATCH PARTIAL - all non-null columns must match. (not
-					 * implemented, can be done by modifying the query below
-					 * to only include non-null columns, or by writing a
-					 * special version here)
+					 * implemented, can be done by modifying
+					 * LookupKeyInPkRelPlanExecute() to only include non-null
+					 * columns.
 					 */
 					break;
 #endif
@@ -406,63 +428,17 @@ RI_FKey_check(TriggerData *trigdata)
 
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
-		char		attname[MAX_QUOTED_NAME_LEN];
-		char		paramname[16];
-		const char *querysep;
-		Oid			queryoids[RI_MAX_NUMKEYS];
-		const char *pk_only;
-
-		/* ----------
-		 * The query string built is
-		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
-		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * corresponding FK attributes.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-						 pk_only, pkrelname);
-		querysep = "WHERE";
-		for (int i = 0; i < riinfo->nkeys; i++)
-		{
-			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-
-			quoteOneName(attname,
-						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
-			sprintf(paramname, "$%d", i + 1);
-			ri_GenerateQual(&querybuf, querysep,
-							attname, pk_type,
-							riinfo->pf_eq_oprs[i],
-							paramname, fk_type);
-			querysep = "AND";
-			queryoids[i] = fk_type;
-		}
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
-		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
-							 querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_LookupKeyInPkRelPlanCreate(). */
+		qplan = ri_PlanCheck(riinfo, ri_LookupKeyInPkRelPlanCreate,
+							 NULL, 0 /* nargs */, NULL /* argtypes */,
 							 &qkey, fk_rel, pk_rel);
 	}
 
-	/*
-	 * Now check that foreign key exists in PK table
-	 *
-	 * XXX detectNewRows must be true when a partitioned table is on the
-	 * referenced side.  The reason is that our snapshot must be fresh in
-	 * order for the hack in find_inheritance_children() to work.
-	 */
+	/* Now check that foreign key exists in PK table. */
 	ri_PerformCheck(riinfo, &qkey, qplan,
 					fk_rel, pk_rel,
 					NULL, newslot,
-					pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
+					false,
 					CMD_SELECT);
 
 	table_close(pk_rel, RowShareLock);
@@ -533,48 +509,9 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 
 	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
 	{
-		StringInfoData querybuf;
-		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
-		char		attname[MAX_QUOTED_NAME_LEN];
-		char		paramname[16];
-		const char *querysep;
-		const char *pk_only;
-		Oid			queryoids[RI_MAX_NUMKEYS];
-
-		/* ----------
-		 * The query string built is
-		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
-		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * PK attributes themselves.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-						 pk_only, pkrelname);
-		querysep = "WHERE";
-		for (int i = 0; i < riinfo->nkeys; i++)
-		{
-			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-
-			quoteOneName(attname,
-						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
-			sprintf(paramname, "$%d", i + 1);
-			ri_GenerateQual(&querybuf, querysep,
-							attname, pk_type,
-							riinfo->pp_eq_oprs[i],
-							paramname, pk_type);
-			querysep = "AND";
-			queryoids[i] = pk_type;
-		}
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
-		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
-							 querybuf.data, riinfo->nkeys, queryoids,
+		/* Prepare and save the plan using ri_LookupKeyInPkRelPlanCreate(). */
+		qplan = ri_PlanCheck(riinfo, ri_LookupKeyInPkRelPlanCreate,
+							 NULL, 0 /* nargs */, NULL /* argtypes */,
 							 &qkey, fk_rel, pk_rel);
 	}
 
@@ -760,7 +697,7 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
 		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -860,7 +797,7 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
 		}
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -977,7 +914,7 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
 		appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len);
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys * 2, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -1204,7 +1141,7 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind)
 		}
 
 		/* Prepare and save the plan using ri_SqlStringPlanCreate(). */
-		qplan = ri_PlanCheck(ri_SqlStringPlanCreate,
+		qplan = ri_PlanCheck(riinfo, ri_SqlStringPlanCreate,
 							 querybuf.data, riinfo->nkeys, queryoids,
 							 &qkey, fk_rel, pk_rel);
 	}
@@ -2013,6 +1950,11 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 	 * saving lots of work and memory when there are many partitions with
 	 * similar FK constraints.
 	 *
+	 * We must not share the plan for RI_PLAN_CHECK_LOOKUPPK queries either,
+	 * because its execution function (ri_LookupKeyInPkRel()) expects to see
+	 * the RI_ConstraintInfo of the individual leaf partitions that the
+	 * query fired on.
+	 *
 	 * (Note that we must still have a separate RI_ConstraintInfo for each
 	 * constraint, because partitions can have different column orders,
 	 * resulting in different pk_attnums[] or fk_attnums[] array contents.)
@@ -2020,7 +1962,8 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 	 * We assume struct RI_QueryKey contains no padding bytes, else we'd need
 	 * to use memset to clear them.
 	 */
-	if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
+	if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
+		constr_queryno != RI_PLAN_CHECK_LOOKUPPK)
 		key->constr_id = riinfo->constraint_root_id;
 	else
 		key->constr_id = riinfo->constraint_id;
@@ -2285,10 +2228,17 @@ InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
 	}
 }
 
+typedef enum RI_Plantype
+{
+	RI_PLAN_SQL = 0,
+	RI_PLAN_CHECK_FUNCTION
+} RI_Plantype;
+
 /* Query string or an equivalent name to show in the error CONTEXT. */
 typedef struct RIErrorCallbackArg
 {
 	const char *query;
+	RI_Plantype plantype;
 } RIErrorCallbackArg;
 
 /*
@@ -2318,7 +2268,17 @@ _RI_error_callback(void *arg)
 		internalerrquery(query);
 	}
 	else
-		errcontext("SQL statement \"%s\"", query);
+	{
+		switch (carg->plantype)
+		{
+			case RI_PLAN_SQL:
+				errcontext("SQL statement \"%s\"", query);
+				break;
+			case RI_PLAN_CHECK_FUNCTION:
+				errcontext("RI check function \"%s\"", query);
+				break;
+		}
+	}
 }
 
 /*
@@ -2555,14 +2515,283 @@ ri_SqlStringPlanFree(RI_Plan *plan)
 	}
 }
 
+/*
+ * Creates an RI_Plan to look a key up in the PK table.
+ *
+ * Not much to do beside initializing the expected callback members, because
+ * there is no query string to parse and plan.
+ */
+static void
+ri_LookupKeyInPkRelPlanCreate(RI_Plan *plan,
+							  const char *querystr, int nargs, Oid *paramtypes)
+{
+	Assert(querystr == NULL);
+	plan->plan_exec_func = ri_LookupKeyInPkRel;
+	plan->plan_exec_arg = NULL;
+	plan->plan_is_valid_func = ri_LookupKeyInPkRelPlanIsValid;
+	plan->plan_free_func = ri_LookupKeyInPkRelPlanFree;
+}
+
+/*
+ * get_fkey_unique_index
+ * 		Returns the unique index used by a supposedly foreign key constraint
+ */
+static Oid
+get_fkey_unique_index(Oid conoid)
+{
+	Oid			result = InvalidOid;
+	HeapTuple	tp;
+
+	tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
+	if (HeapTupleIsValid(tp))
+	{
+		Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
+
+		if (contup->contype == CONSTRAINT_FOREIGN)
+			result = contup->conindid;
+		ReleaseSysCache(tp);
+	}
+
+	if (!OidIsValid(result))
+		elog(ERROR, "unique index not found for foreign key constraint %u",
+			 conoid);
+
+	return result;
+}
+
+/*
+ * Checks whether a tuple containing the given unique key given by pk_vals,
+ * pk_nulls exists in 'pk_rel'.  The key is looked up using the the
+ * constraint's index given in plan->riinfo.
+ *
+ * If 'pk_rel' is a partitioned table, the check is performed on its leaf
+ * partition that would contain the key.
+ *
+ * The provided tuple is either the one being inserted into the referencing
+ * relation (fk_rel) or the one being deleted from the referenced relation
+ * (pk_rel).
+ */
+static int
+ri_LookupKeyInPkRel(struct RI_Plan *plan,
+					Relation fk_rel, Relation pk_rel,
+					Datum *pk_vals, char *pk_nulls,
+					Snapshot test_snapshot, Snapshot crosscheck_snapshot,
+					int limit, CmdType *last_stmt_cmdtype)
+{
+	const RI_ConstraintInfo *riinfo = plan->riinfo;
+	Oid			constr_id = riinfo->constraint_id;
+	Oid			idxoid;
+	Relation	idxrel;
+	Relation	leaf_pk_rel = NULL;
+	int			num_pk;
+	int			i;
+	int			tuples_processed = 0;
+	const Oid  *eq_oprs;
+	ScanKeyData skey[INDEX_MAX_KEYS];
+	IndexScanDesc	scan;
+	TupleTableSlot *outslot;
+	AclResult	aclresult;
+	RIErrorCallbackArg ricallbackarg;
+	ErrorContextCallback rierrcontext;
+
+	/* We're effectively doing a CMD_SELECT below. */
+	*last_stmt_cmdtype = CMD_SELECT;
+
+	/*
+	 * Setup error traceback support for ereport()
+	 */
+	ricallbackarg.query = "ri_LookupKeyInPkRel";
+	ricallbackarg.plantype = RI_PLAN_CHECK_FUNCTION;
+	rierrcontext.callback = _RI_error_callback;
+	rierrcontext.arg = &ricallbackarg;
+	rierrcontext.previous = error_context_stack;
+	error_context_stack = &rierrcontext;
+
+	/* XXX Maybe afterTriggerInvokeEvents() / AfterTriggerExecute() should? */
+	CHECK_FOR_INTERRUPTS();
+
+	/*
+	 * Choose the equality operators to use when scanning the PK index below.
+	 */
+	if (plan->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
+	{
+		/* Use PK = FK equality operator. */
+		eq_oprs = riinfo->pf_eq_oprs;
+
+		/*
+		 * May need to cast each of the individual values of the foreign key
+		 * to the corresponding PK column's type if the equality operator
+		 * demands it.
+		 */
+		for (i = 0; i < riinfo->nkeys; i++)
+		{
+			if (pk_nulls[i] != 'n')
+			{
+				Oid		eq_opr = eq_oprs[i];
+				Oid		typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+				RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+				if (OidIsValid(entry->cast_func_finfo.fn_oid))
+					pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
+											   pk_vals[i],
+											   Int32GetDatum(-1), /* typmod */
+											   BoolGetDatum(false)); /* implicit coercion */
+			}
+		}
+	}
+	else
+	{
+		Assert(plan->constr_queryno == RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
+		/* Use PK = PK equality operator. */
+		eq_oprs = riinfo->pp_eq_oprs;
+	}
+
+	/*
+	 * Must explicitly check that the new user has permissions to look into the
+	 * schema of and SELECT from the referenced table.
+	 */
+	aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel),
+									  GetUserId(), ACL_USAGE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_SCHEMA,
+					   get_namespace_name(RelationGetNamespace(pk_rel)));
+	aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(),
+								  ACL_SELECT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_TABLE,
+					   RelationGetRelationName(pk_rel));
+
+	/*
+	 * Open the constraint index to be scanned.
+	 *
+	 * If the target table is partitioned, we must look up the leaf partition
+	 * and its corresponding unique index to search the keys in.
+	 */
+	idxoid = get_fkey_unique_index(constr_id);
+	if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		Oid		leaf_idxoid;
+		Snapshot mysnap;
+
+		/*
+		 * HACK: find_inheritance_children_extended() that might get called
+		 * as part of the following function has a hack that assumes that the
+		 * queries originating in this module push the latest snapshot in
+		 * transaction-snapshot mode.
+		 */
+		mysnap = GetLatestSnapshot();
+		PushActiveSnapshot(mysnap);
+
+		leaf_pk_rel = ExecGetLeafPartitionForKey(pk_rel, riinfo->nkeys,
+												 riinfo->pk_attnums,
+												 pk_vals, pk_nulls,
+												 idxoid, RowShareLock,
+												 &leaf_idxoid);
+		/*
+		 * HACK: done fiddling with the partition descriptor machinery so
+		 * unset the active snapshot.
+		 */
+		PopActiveSnapshot();
+
+		/*
+		 * If no suitable leaf partition exists, neither can the key we're
+		 * looking for.
+		 */
+		if (leaf_pk_rel == NULL)
+			return false;
+
+		pk_rel = leaf_pk_rel;
+		idxoid = leaf_idxoid;
+	}
+	idxrel = index_open(idxoid, RowShareLock);
+
+	/* Set up ScanKeys for the index scan. */
+	num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
+	for (i = 0; i < num_pk; i++)
+	{
+		int			pkattno = i + 1;
+		Oid			operator = eq_oprs[i];
+		Oid			opfamily = idxrel->rd_opfamily[i];
+		StrategyNumber strat = get_op_opfamily_strategy(operator, opfamily);
+		RegProcedure regop = get_opcode(operator);
+
+		/* Initialize the scankey. */
+		ScanKeyInit(&skey[i],
+					pkattno,
+					strat,
+					regop,
+					pk_vals[i]);
+
+		skey[i].sk_collation = idxrel->rd_indcollation[i];
+
+		/*
+		 * Check for null value.  Should not occur, because callers currently
+		 * take care of the cases in which they do occur.
+		 */
+		if (pk_nulls[i] == 'n')
+			skey[i].sk_flags |= SK_ISNULL;
+	}
+
+	scan = index_beginscan(pk_rel, idxrel, test_snapshot, num_pk, 0);
+	index_rescan(scan, skey, num_pk, NULL, 0);
+
+	/* Look for the tuple, and if found, try to lock it in key share mode. */
+	outslot = table_slot_create(pk_rel, NULL);
+	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+	{
+		/*
+		 * If we fail to lock the tuple for whatever reason, assume it doesn't
+		 * exist.
+		 */
+		if (ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
+							   test_snapshot,
+							   GetCurrentCommandId(false),
+							   LockTupleKeyShare,
+							   LockWaitBlock, NULL))
+			tuples_processed = 1;
+	}
+
+	index_endscan(scan);
+	ExecDropSingleTupleTableSlot(outslot);
+
+	/* Don't release lock until commit. */
+	index_close(idxrel, NoLock);
+
+	/* Close leaf partition relation if any. */
+	if (leaf_pk_rel)
+		table_close(leaf_pk_rel, NoLock);
+
+	/*
+	 * Pop the error context stack
+	 */
+	error_context_stack = rierrcontext.previous;
+
+	return tuples_processed;
+}
+
+static bool
+ri_LookupKeyInPkRelPlanIsValid(RI_Plan *plan)
+{
+	/* Never store anything that can be invalidated. */
+	return true;
+}
+
+static void
+ri_LookupKeyInPkRelPlanFree(RI_Plan *plan)
+{
+	/* Nothing to free. */
+}
+
 /*
  * Create an RI_Plan for a given RI check query and initialize the
  * plan callbacks and execution argument using the caller specified
  * function.
  */
 static RI_Plan *
-ri_PlanCreate(RI_PlanCreateFunc_type plan_create_func,
-			  const char *querystr, int nargs, Oid *paramtypes)
+ri_PlanCreate(const RI_ConstraintInfo *riinfo,
+			  RI_PlanCreateFunc_type plan_create_func,
+			  const char *querystr, int nargs, Oid *paramtypes,
+			  int constr_queryno)
 {
 	RI_Plan	   *plan;
 	MemoryContext plancxt,
@@ -2577,6 +2806,8 @@ ri_PlanCreate(RI_PlanCreateFunc_type plan_create_func,
 									ALLOCSET_SMALL_SIZES);
 	oldcxt = MemoryContextSwitchTo(plancxt);
 	plan = (RI_Plan *) palloc0(sizeof(*plan));
+	plan->riinfo = riinfo;
+	plan->constr_queryno = constr_queryno;
 	plan->plancxt = plancxt;
 	plan->nargs = nargs;
 	if (plan->nargs > 0)
@@ -2642,7 +2873,8 @@ ri_FreePlan(RI_Plan *plan)
  * Prepare execution plan for a query to enforce an RI restriction
  */
 static RI_Plan *
-ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
+ri_PlanCheck(const RI_ConstraintInfo *riinfo,
+			 RI_PlanCreateFunc_type plan_create_func,
 			 const char *querystr, int nargs, Oid *argtypes,
 			 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
 {
@@ -2666,7 +2898,8 @@ ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func,
 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
 						   SECURITY_NOFORCE_RLS);
 	/* Create the plan */
-	qplan = ri_PlanCreate(plan_create_func, querystr, nargs, argtypes);
+	qplan = ri_PlanCreate(riinfo, plan_create_func, querystr, nargs,
+						  argtypes, qkey->constr_queryno);
 
 	/* Restore UID and security context */
 	SetUserIdAndSecContext(save_userid, save_sec_context);
@@ -3277,7 +3510,10 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid,
  * ri_HashCompareOp -
  *
  * See if we know how to compare two values, and create a new hash entry
- * if not.
+ * if not.  The entry contains the FmgrInfo of the equality operator function
+ * and that of the cast function, if one is needed to convert the right
+ * operand (whose type OID has been passed) before passing it to the equality
+ * function.
  */
 static RI_CompareHashEntry *
 ri_HashCompareOp(Oid eq_opr, Oid typeid)
@@ -3333,8 +3569,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
 		 * moment since that will never be generated for implicit coercions.
 		 */
 		op_input_types(eq_opr, &lefttype, &righttype);
-		Assert(lefttype == righttype);
-		if (typeid == lefttype)
+
+		/*
+		 * Don't need to cast if the values that will be passed to the
+		 * operator will be of expected operand type(s).  The operator can be
+		 * cross-type (such as when called by ri_LookupKeyInPkRel()), in which
+		 * case, we only need the cast if the right operand value doesn't match
+		 * the type expected by the operator.
+		 */
+		if ((lefttype == righttype && typeid == lefttype) ||
+			(lefttype != righttype && typeid == righttype))
 			castfunc = InvalidOid;	/* simplest case */
 		else
 		{
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index 708435e952..cbe1d996e6 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -31,6 +31,12 @@ extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
 										EState *estate);
 extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
 									PartitionTupleRouting *proute);
+extern Relation ExecGetLeafPartitionForKey(Relation root_rel,
+										   int key_natts,
+										   const AttrNumber *key_attnums,
+										   Datum *key_vals, char *key_nulls,
+										   Oid root_idxoid, int lockmode,
+										   Oid *leaf_idxoid);
 
 
 /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index d68a6b9d28..315015f1d1 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -243,6 +243,15 @@ extern bool ExecShutdownNode(PlanState *node);
 extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
 
 
+/*
+ * functions in execLockRows.c
+ */
+
+extern bool ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *epq_needed);
+
 /* ----------------------------------------------------------------
  *		ExecProcNode
  *
diff --git a/src/test/isolation/expected/fk-snapshot.out b/src/test/isolation/expected/fk-snapshot.out
index 5faf80d6ce..22752cc742 100644
--- a/src/test/isolation/expected/fk-snapshot.out
+++ b/src/test/isolation/expected/fk-snapshot.out
@@ -47,12 +47,12 @@ a
 
 step s2ifn2: INSERT INTO fk_noparted VALUES (2);
 step s2c: COMMIT;
+ERROR:  insert or update on table "fk_noparted" violates foreign key constraint "fk_noparted_a_fkey"
 step s2sfn: SELECT * FROM fk_noparted;
 a
 -
 1
-2
-(2 rows)
+(1 row)
 
 
 starting permutation: s1brc s2brc s2ip2 s1sp s2c s1sp s1ifp2 s2brc s2sfp s1c s1sfp s2ifn2 s2c s2sfn
diff --git a/src/test/isolation/specs/fk-snapshot.spec b/src/test/isolation/specs/fk-snapshot.spec
index 378507fbc3..64d27f29c3 100644
--- a/src/test/isolation/specs/fk-snapshot.spec
+++ b/src/test/isolation/specs/fk-snapshot.spec
@@ -46,10 +46,7 @@ step s2sfn	{ SELECT * FROM fk_noparted; }
 # inserting into referencing tables in transaction-snapshot mode
 # PK table is non-partitioned
 permutation s1brr s2brc s2ip2 s1sp s2c s1sp s1ifp2 s1c s1sfp
-# PK table is partitioned: buggy, because s2's serialization transaction can
-# see the uncommitted row thanks to the latest snapshot taken for
-# partition lookup to work correctly also ends up getting used by the PK index
-# scan
+# PK table is partitioned
 permutation s2ip2 s2brr s1brc s1ifp2 s2sfp s1c s2sfp s2ifn2 s2c s2sfn
 
 # inserting into referencing tables in up-to-date snapshot mode
-- 
2.35.3

