On 2019-Feb-28, Amit Langote wrote:

> I'd like to hear your thoughts on some suggestions to alter the structure
> of the reorganized code around foreign key addition/cloning.  With this
> patch adding support for foreign keys to reference partitioned tables, the
> code now has to consider various cases due to the possibility of having
> partitioned tables on both sides of a foreign key, which is reflected in
> the complexity of the new code.

I spent a few hours studying this and my conclusion is the opposite of
yours: we should make addFkRecurseReferencing the recursive one, and
CloneFkReferencing a non-recursive caller of that.  So we end up with
both addFkRecurseReferenced and addFkRecurseReferencing as recursive
routines, and CloneFkReferenced and CloneFkReferencing being
non-recursive callers of those.  With this structure change there is one
more call to CreateConstraintEntry than before, and now there are two
calls of tryAttachPartitionForeignKey instead of one; I think with this
new structure things are much simpler.  I also changed
CloneForeignKeyConstraints's API: instead of returning a list of cloned
constraint giving its caller the responsibility of adding FK checks to
phase 3, we now give CloneForeignKeyConstraints the 'wqueue' list, so
that it can add the FK checks itself.  It seems much cleaner this way.

> Also, it seems a bit confusing that there is a CreateConstraintEntry call
> in addFkRecurseReferenced() which is creating a constraint on the
> *referencing* relation, which I think should be in
> ATAddForeignKeyConstraint, the caller.  I know we need to create a copy of
> the constraint to reference the partitions of the referenced table, but we
> might as well put it in CloneFkReferenced and reverse who calls who --
> make addFkRecurseReferenced() call CloneFkReferenced and have the code to
> create the cloned constraint and action triggers in the latter.  That will
> make the code to handle different sides of foreign key look similar, and
> imho, easier to follow.

Well, if you think about it, *all* the constraints created by all these
routines are in the referencing relations.  The only question here is
*why* we create those tuples; in the case of the ...Referenced routines,
it's because of the partitions in the referenced side; in the case of
the ...Referencing routines, it's because of partitions in the
referencing side.  I think changing it the way you suggest would be even
more confusing.


As discussed in the other subthread, I'm not making any effort to reuse
an existing constraint defined in a partition of the referenced side; as
far as I can tell that's a nonsensical transformation.


A pretty silly bug remains here.  Watch:

create table pk (a int primary key) partition by list (a);
create table pk1 partition of pk for values in (1);
create table fk (a int references pk);
insert into pk values (1);
insert into fk values (1);
drop table pk cascade;

Note that the DROP of the main PK table is pretty aggressive: since it
cascades, you want it to drop the constraint on the FK side.  This would
work without a problem if 'pk' was a non-partitioned table, but in this
case it fails:

alvherre=# drop table pk cascade;
NOTICE:  drop cascades to constraint fk_a_fkey on table fk
ERROR:  removing partition "pk1" violates foreign key constraint "fk_a_fkey1"
DETALLE:  Key (a)=(1) still referenced from table "fk".

The reason is that the "pre drop check" that's done before allow a drop
of the partition doesn't realize that the constraint is also being
dropped (and so the check ought to be skipped).  If you were to do just
"DROP TABLE pk1" then this complaint would be correct, since it would
leave the constraint in an invalid state.  But here, it's bogus and
annoying.  You can DELETE the matching values from table FK and then the
DROP works.  Here's another problem caused by the same misbehavior:

alvherre=# drop table pk, fk;
ERROR:  removing partition "pk1" violates foreign key constraint "fk_a_fkey1"
DETALLE:  Key (a)=(1) still referenced from table "fk".

Note here we want to get rid of table 'fk' completely.  If you split it
up in a DROP of fk, followed by a DROP of pk, it works.

I'm not yet sure what's the best way to attack this.  Maybe the
"pre-drop check" needs a completely different approach.  The simplest
approach is to prohibit a table drop or detach for any partitioned table
that's referenced by a foreign key, but that seems obnoxious and
inconvenient.


I still haven't put back the code in "#if 0".

FWIW I think we should add an index on pg_constraint.confrelid now.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 26ee4ece2435ef65b36c866dcc38a527042136e2 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 28 Nov 2018 11:52:00 -0300
Subject: [PATCH v6 1/3] Rework deleteObjectsInList to allow objtype-specific
 checks

This doesn't change any functionality yet.
---
 src/backend/catalog/dependency.c | 41 +++++++++++++++++++-------------
 1 file changed, 25 insertions(+), 16 deletions(-)

diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 2048d71535b..0b4c47b808c 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -230,29 +230,38 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel,
 	int			i;
 
 	/*
-	 * Keep track of objects for event triggers, if necessary.
+	 * Invoke pre-deletion callbacks and keep track of objects for event
+	 * triggers, if necessary.
 	 */
-	if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL))
+	for (i = 0; i < targetObjects->numrefs; i++)
 	{
-		for (i = 0; i < targetObjects->numrefs; i++)
+		const ObjectAddress *thisobj = &targetObjects->refs[i];
+		Oid			objectClass = getObjectClass(thisobj);
+
+		if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL))
 		{
-			const ObjectAddress *thisobj = &targetObjects->refs[i];
-			const ObjectAddressExtra *extra = &targetObjects->extras[i];
-			bool		original = false;
-			bool		normal = false;
-
-			if (extra->flags & DEPFLAG_ORIGINAL)
-				original = true;
-			if (extra->flags & DEPFLAG_NORMAL)
-				normal = true;
-			if (extra->flags & DEPFLAG_REVERSE)
-				normal = true;
-
-			if (EventTriggerSupportsObjectClass(getObjectClass(thisobj)))
+			if (EventTriggerSupportsObjectClass(objectClass))
 			{
+				bool		original = false;
+				bool		normal = false;
+				const ObjectAddressExtra *extra = &targetObjects->extras[i];
+
+				if (extra->flags & DEPFLAG_ORIGINAL)
+					original = true;
+				if (extra->flags & DEPFLAG_NORMAL ||
+					extra->flags & DEPFLAG_REVERSE)
+					normal = true;
+
 				EventTriggerSQLDropAddObject(thisobj, original, normal);
 			}
 		}
+
+		/* Invoke class-specific pre-deletion checks */
+		switch (objectClass)
+		{
+			default:
+				break;
+		}
 	}
 
 	/*
-- 
2.17.1

>From 70e04b863028e992727b53f60d21d9da543eb2be Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 28 Feb 2019 17:44:06 -0300
Subject: [PATCH v6 2/3] Add index_get_partition convenience function

This new function simplifies some existing coding, as well as supports
future patches.

This may end up backpatched to 11, per
https://postgr.es/m/201902041720.gv2f3hw5rfhe@alvherre.pgsql

Discussion: https://postgr.es/m/201901222145.t6wws6t6vrcu@alvherre.pgsql
Reviewed-by: Amit Langote
---
 src/backend/catalog/partition.c  | 36 ++++++++++++++++++++++++++++
 src/backend/commands/tablecmds.c | 40 +++++++++-----------------------
 src/include/catalog/partition.h  |  1 +
 3 files changed, 48 insertions(+), 29 deletions(-)

diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index 3ccdaff8c45..8ea7a62418f 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -145,6 +145,42 @@ get_partition_ancestors_worker(Relation inhRel, Oid relid, List **ancestors)
 	get_partition_ancestors_worker(inhRel, parentOid, ancestors);
 }
 
+/*
+ * index_get_partition
+ *		Return the OID of index of the given partition that is a child
+ *		of the given index, or InvalidOid if there isn't one.
+ */
+Oid
+index_get_partition(Relation partition, Oid indexId)
+{
+	List	   *idxlist = RelationGetIndexList(partition);
+	ListCell   *l;
+
+	foreach(l, idxlist)
+	{
+		Oid			partIdx = lfirst_oid(l);
+		HeapTuple	tup;
+		Form_pg_class classForm;
+		bool		ispartition;
+
+		tup = SearchSysCache1(RELOID, ObjectIdGetDatum(partIdx));
+		if (!tup)
+			elog(ERROR, "cache lookup failed for relation %u", partIdx);
+		classForm = (Form_pg_class) GETSTRUCT(tup);
+		ispartition = classForm->relispartition;
+		ReleaseSysCache(tup);
+		if (!ispartition)
+			continue;
+		if (get_partition_parent(lfirst_oid(l)) == indexId)
+		{
+			list_free(idxlist);
+			return partIdx;
+		}
+	}
+
+	return InvalidOid;
+}
+
 /*
  * map_partition_varattnos - maps varattno of any Vars in expr from the
  * attno's of 'from_rel' to the attno's of 'to_rel' partition, each of which
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 515c29072c8..3183b2aaa12 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -15649,36 +15649,18 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
 static void
 refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl)
 {
-	Relation	pg_inherits;
-	ScanKeyData key;
-	HeapTuple	tuple;
-	SysScanDesc scan;
+	Oid			existingIdx;
 
-	pg_inherits = table_open(InheritsRelationId, AccessShareLock);
-	ScanKeyInit(&key, Anum_pg_inherits_inhparent,
-				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(RelationGetRelid(parentIdx)));
-	scan = systable_beginscan(pg_inherits, InheritsParentIndexId, true,
-							  NULL, 1, &key);
-	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
-	{
-		Form_pg_inherits inhForm;
-		Oid			tab;
-
-		inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
-		tab = IndexGetRelation(inhForm->inhrelid, false);
-		if (tab == RelationGetRelid(partitionTbl))
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
-							RelationGetRelationName(partIdx),
-							RelationGetRelationName(parentIdx)),
-					 errdetail("Another index is already attached for partition \"%s\".",
-							   RelationGetRelationName(partitionTbl))));
-	}
-
-	systable_endscan(scan);
-	table_close(pg_inherits, AccessShareLock);
+	existingIdx = index_get_partition(partitionTbl,
+									  RelationGetRelid(parentIdx));
+	if (OidIsValid(existingIdx))
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
+						RelationGetRelationName(partIdx),
+						RelationGetRelationName(parentIdx)),
+				 errdetail("Another index is already attached for partition \"%s\".",
+						   RelationGetRelationName(partitionTbl))));
 }
 
 /*
diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h
index d84e3259835..616e18af308 100644
--- a/src/include/catalog/partition.h
+++ b/src/include/catalog/partition.h
@@ -21,6 +21,7 @@
 
 extern Oid	get_partition_parent(Oid relid);
 extern List *get_partition_ancestors(Oid relid);
+extern Oid	index_get_partition(Relation partition, Oid indexId);
 extern List *map_partition_varattnos(List *expr, int fromrel_varno,
 						Relation to_rel, Relation from_rel,
 						bool *found_whole_row);
-- 
2.17.1

>From fda488ed2ffc95c98b2f46fd76afcbce7f87c13d Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 20 Feb 2019 15:08:20 -0300
Subject: [PATCH v6 3/3] support FKs referencing partitioned tables

---
 doc/src/sgml/ref/create_table.sgml        |    7 +-
 src/backend/catalog/dependency.c          |    3 +
 src/backend/catalog/heap.c                |   24 +
 src/backend/commands/tablecmds.c          | 1376 +++++++++++++++------
 src/backend/utils/adt/ri_triggers.c       |  246 +++-
 src/backend/utils/adt/ruleutils.c         |   18 +
 src/include/catalog/heap.h                |    2 +
 src/include/commands/tablecmds.h          |    8 +-
 src/include/commands/trigger.h            |    1 +
 src/include/utils/ruleutils.h             |    1 +
 src/test/regress/expected/foreign_key.out |  166 ++-
 src/test/regress/sql/foreign_key.sql      |  114 +-
 12 files changed, 1531 insertions(+), 435 deletions(-)

diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index e94fe2c3b67..37ae0f00fda 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -378,9 +378,6 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
      <para>
       Partitioned tables do not support <literal>EXCLUDE</literal> constraints;
       however, you can define these constraints on individual partitions.
-      Also, while it's possible to define <literal>PRIMARY KEY</literal>
-      constraints on partitioned tables, creating foreign keys that
-      reference a partitioned table is not yet supported.
      </para>
 
      <para>
@@ -995,9 +992,7 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       addition of a foreign key constraint requires a
       <literal>SHARE ROW EXCLUSIVE</literal> lock on the referenced table.
       Note that foreign key constraints cannot be defined between temporary
-      tables and permanent tables.  Also note that while it is possible to
-      define a foreign key on a partitioned table, it is not possible to
-      declare a foreign key that references a partitioned table.
+      tables and permanent tables.
      </para>
 
      <para>
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 0b4c47b808c..11ec9d2f853 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -259,6 +259,9 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel,
 		/* Invoke class-specific pre-deletion checks */
 		switch (objectClass)
 		{
+			case OCLASS_CLASS:
+				pre_drop_class_check(thisobj->objectId, thisobj->objectSubId);
+				break;
 			default:
 				break;
 		}
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index c7b5ff62f9f..6680f755dd5 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -1825,6 +1825,30 @@ RemoveAttrDefaultById(Oid attrdefId)
 	relation_close(myrel, NoLock);
 }
 
+/*
+ * Checks to be run before just dropping a relation.
+ */
+void
+pre_drop_class_check(Oid relationId, Oid objectSubId)
+{
+	Relation	relation;
+
+	/* caller must hold strong lock already, if they're dropping */
+	relation = relation_open(relationId, NoLock);
+
+	/*
+	 * For leaf partitions, this is our last chance to verify any foreign keys
+	 * that may point to the partition as referenced table.
+	 */
+	if (relation->rd_rel->relkind == RELKIND_RELATION &&
+		relation->rd_rel->relispartition)
+		CheckNoForeignKeyRefs(relation,
+							  GetParentedForeignKeyRefs(relation),
+							  true);
+
+	relation_close(relation, NoLock);
+}
+
 /*
  * heap_drop_with_catalog	- removes specified relation from catalogs
  *
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3183b2aaa12..8e26053a5c3 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -414,10 +414,33 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
 						  Relation rel, Constraint *fkconstraint, Oid parentConstr,
 						  bool recurse, bool recursing,
 						  LOCKMODE lockmode);
-static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
-						   List **cloned);
-static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
-				   Relation partRel, List *clone, List **cloned);
+static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint,
+					   Relation rel, Relation pkrel, Oid indexOid, Oid parentConstraint,
+					   int numfks, int16 *pkattnum, int16 *fkattnum,
+					   Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+					   bool old_check_ok);
+static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint,
+						Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr, int numfks,
+						int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
+						Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok,
+						LOCKMODE lockmode);
+static void CloneForeignKeyConstraints(List **wqueue, Oid parentId,
+						   Oid relationId);
+static void CloneFkReferenced(Relation parentRel, Relation partitionRel,
+				  Relation pg_constraint);
+static void CloneFkReferencing(List **wqueue, Relation parentRel,
+				   Relation partRel, List *clone);
+static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
+							  Constraint *fkconstraint, Oid constraintOid,
+							  Oid indexOid);
+static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid,
+							   Constraint *fkconstraint, Oid constraintOid,
+							   Oid indexOid);
+static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+							 Oid partRelid,
+							 Oid parentConstrOid, int numfks,
+							 AttrNumber *mapped_conkey, AttrNumber *confkey,
+							 Oid *conpfeqop);
 static void ATExecDropConstraint(Relation rel, const char *constrName,
 					 DropBehavior behavior,
 					 bool recurse, bool recursing,
@@ -1058,7 +1081,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 		 * And foreign keys too.  Note that because we're freshly creating the
 		 * table, there is no need to verify these new constraints.
 		 */
-		CloneForeignKeyConstraints(parentId, relationId, NULL);
+		CloneForeignKeyConstraints(NULL, parentId, relationId);
 
 		table_close(parent, NoLock);
 	}
@@ -3514,7 +3537,8 @@ AlterTableGetLockLevel(List *cmds)
 
 				/*
 				 * Removing constraints can affect SELECTs that have been
-				 * optimised assuming the constraint holds true.
+				 * optimised assuming the constraint holds true. See also
+				 * CloneFkReferenced.
 				 */
 			case AT_DropConstraint: /* as DROP INDEX */
 			case AT_DropNotNull:	/* may change some SQL plans */
@@ -7173,9 +7197,6 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 		case CONSTR_FOREIGN:
 
 			/*
-			 * Note that we currently never recurse for FK constraints, so the
-			 * "recurse" flag is silently ignored.
-			 *
 			 * Assign or validate constraint name
 			 */
 			if (newConstraint->conname)
@@ -7393,6 +7414,13 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  * Subroutine for ATExecAddConstraint.  Must already hold exclusive
  * lock on the rel, and have done appropriate validity checks for it.
  * We do permissions checks here, however.
+ *
+ * When the referenced or referencing tables (or both) are partitioned,
+ * multiple pg_constraint rows are required -- one for each partitioned table
+ * and each partition on each side (fortunately, not one for every combination
+ * thereof).  We also need action triggers on each leaf partition on the
+ * referenced side, and check triggers on each leaf partition on the
+ * referencing side.
  */
 static ObjectAddress
 ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
@@ -7408,12 +7436,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	Oid			pfeqoperators[INDEX_MAX_KEYS];
 	Oid			ppeqoperators[INDEX_MAX_KEYS];
 	Oid			ffeqoperators[INDEX_MAX_KEYS];
-	bool		connoinherit;
 	int			i;
 	int			numfks,
 				numpks;
 	Oid			indexOid;
-	Oid			constrOid;
 	bool		old_check_ok;
 	ObjectAddress address;
 	ListCell   *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop);
@@ -7431,12 +7457,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	 * Validity checks (permission checks wait till we have the column
 	 * numbers)
 	 */
-	if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		ereport(ERROR,
-				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("cannot reference partitioned table \"%s\"",
-						RelationGetRelationName(pkrel))));
-
 	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
 		if (!recurse)
@@ -7454,7 +7474,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 					 errdetail("This feature is not yet supported on partitioned tables.")));
 	}
 
-	if (pkrel->rd_rel->relkind != RELKIND_RELATION)
+	if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+		pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("referenced relation \"%s\" is not a table",
@@ -7664,8 +7685,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 		if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
 			ereport(ERROR,
 					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("foreign key constraint \"%s\" "
-							"cannot be implemented",
+					 errmsg("foreign key constraint \"%s\" cannot be implemented",
 							fkconstraint->conname),
 					 errdetail("Key columns \"%s\" and \"%s\" "
 							   "are of incompatible types: %s and %s.",
@@ -7753,21 +7773,132 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	}
 
 	/*
-	 * FKs always inherit for partitioned tables, and never for legacy
-	 * inheritance.
+	 * Create all the constraint and trigger objects, recursing to partitions
+	 * as necessary.  First handle the referenced side.
 	 */
-	connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
+	address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel,
+									 indexOid,
+									 InvalidOid,	/* no parent constraint */
+									 numfks,
+									 pkattnum,
+									 fkattnum,
+									 pfeqoperators,
+									 ppeqoperators,
+									 ffeqoperators,
+									 old_check_ok);
+
+	/* Now handle the referencing side. */
+	addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel,
+							indexOid,
+							address.objectId,
+							numfks,
+							pkattnum,
+							fkattnum,
+							pfeqoperators,
+							ppeqoperators,
+							ffeqoperators,
+							old_check_ok,
+							lockmode);
+
+	/*
+	 * Done.  Close pk table, but keep lock until we've committed.
+	 */
+	table_close(pkrel, NoLock);
+
+	return address;
+}
+
+/*
+ * addFkRecurseReferenced
+ *		subroutine for ATAddForeignKeyConstraint; recurses on the referenced
+ *		side of the constraint
+ *
+ * Create pg_constraint rows for the referenced side of the constraint,
+ * referencing the parent of the referencing side; also create action triggers
+ * on leaf partitions.  If the table is partitioned, recurse to handle each
+ * partition.
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the root referencing relation.
+ * pkrel is the referenced relation; might be a partition, if recursing.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstraint is the OID of a parent constraint; InvalidOid if this is a
+ * top-level constraint.
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ */
+static ObjectAddress
+addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
+					   Relation pkrel, Oid indexOid, Oid parentConstraint,
+					   int numfks,
+					   int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
+					   Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok)
+{
+	ObjectAddress address;
+	Oid			constrOid;
+	char	   *conname;
+	bool		conislocal;
+	int			coninhcount;
+	bool		connoinherit;
+
+	/*
+	 * Verify relkind for each referenced partition.  At the top level, this
+	 * is redundant with a previous check, but we need it when recursing.
+	 */
+	if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+		pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("referenced relation \"%s\" is not a table",
+						RelationGetRelationName(pkrel))));
+
+	/*
+	 * Caller supplies us with a constraint name; however, it may be used in
+	 * this partition, so come up with a different one in that case.
+	 */
+	if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+							 RelationGetRelid(rel),
+							 fkconstraint->conname))
+		conname = ChooseConstraintName(RelationGetRelationName(rel),
+									   ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+									   "fkey",
+									   RelationGetNamespace(rel), NIL);
+	else
+		conname = fkconstraint->conname;
+
+	if (OidIsValid(parentConstraint))
+	{
+		conislocal = false;
+		coninhcount = 1;
+		connoinherit = false;
+	}
+	else
+	{
+		conislocal = true;
+		coninhcount = 0;
+
+		/*
+		 * always inherit for partitioned tables, never for legacy inheritance
+		 */
+		connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
+	}
 
 	/*
 	 * Record the FK constraint in pg_constraint.
 	 */
-	constrOid = CreateConstraintEntry(fkconstraint->conname,
+	constrOid = CreateConstraintEntry(conname,
 									  RelationGetNamespace(rel),
 									  CONSTRAINT_FOREIGN,
 									  fkconstraint->deferrable,
 									  fkconstraint->initdeferred,
 									  fkconstraint->initially_valid,
-									  parentConstr,
+									  parentConstraint,
 									  RelationGetRelid(rel),
 									  fkattnum,
 									  numfks,
@@ -7779,108 +7910,314 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 									  pfeqoperators,
 									  ppeqoperators,
 									  ffeqoperators,
-									  numpks,
+									  numfks,
 									  fkconstraint->fk_upd_action,
 									  fkconstraint->fk_del_action,
 									  fkconstraint->fk_matchtype,
 									  NULL, /* no exclusion constraint */
 									  NULL, /* no check constraint */
 									  NULL,
-									  true, /* islocal */
-									  0,	/* inhcount */
-									  connoinherit,	/* conNoInherit */
+									  conislocal,	/* islocal */
+									  coninhcount,	/* inhcount */
+									  connoinherit, /* conNoInherit */
 									  false);	/* is_internal */
+
 	ObjectAddressSet(address, ConstraintRelationId, constrOid);
 
 	/*
-	 * Create the triggers that will enforce the constraint.  We only want the
-	 * action triggers to appear for the parent partitioned relation, even
-	 * though the constraints also exist below.
+	 * Also, if this is a constraint on a partition, give it partition-type
+	 * dependencies on the parent constraint as well as the table.
 	 */
-	createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
-							 constrOid, indexOid, !recursing);
+	if (OidIsValid(parentConstraint))
+	{
+		ObjectAddress referenced;
+
+		ObjectAddressSet(referenced, ConstraintRelationId, parentConstraint);
+		recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
+		ObjectAddressSet(referenced, RelationRelationId, RelationGetRelid(pkrel));
+		recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
+	}
+
+	/* make new constraint visible, in case we add more */
+	CommandCounterIncrement();
 
 	/*
-	 * Tell Phase 3 to check that the constraint is satisfied by existing
-	 * rows. We can skip this during table creation, when requested explicitly
-	 * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're
-	 * recreating a constraint following a SET DATA TYPE operation that did
-	 * not impugn its validity.
+	 * If the referenced table is a plain relation, create the action triggers
+	 * that enforce the constraint.
 	 */
-	if (!old_check_ok && !fkconstraint->skip_validation)
+	if (pkrel->rd_rel->relkind == RELKIND_RELATION)
 	{
-		NewConstraint *newcon;
-
-		newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-		newcon->name = fkconstraint->conname;
-		newcon->contype = CONSTR_FOREIGN;
-		newcon->refrelid = RelationGetRelid(pkrel);
-		newcon->refindid = indexOid;
-		newcon->conid = constrOid;
-		newcon->qual = (Node *) fkconstraint;
-
-		tab->constraints = lappend(tab->constraints, newcon);
+		createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel),
+									   fkconstraint,
+									   constrOid, indexOid);
 	}
 
 	/*
-	 * When called on a partitioned table, recurse to create the constraint on
-	 * the partitions also.
+	 * If the referenced table is partitioned, recurse on ourselves to handle
+	 * each partition.  We need one pg_constraint row created for each
+	 * partition in addition to the pg_constraint row for the parent table.
 	 */
-	if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc partdesc;
-		Relation	pg_constraint;
-		List	   *cloned = NIL;
-		ListCell   *cell;
+		PartitionDesc pd = RelationGetPartitionDesc(pkrel);
 
-		pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock);
-		partdesc = RelationGetPartitionDesc(rel);
-
-		for (i = 0; i < partdesc->nparts; i++)
+		for (int i = 0; i < pd->nparts; i++)
 		{
-			Oid			partitionId = partdesc->oids[i];
+			Relation	partRel;
+			AttrNumber *map;
+			AttrNumber *mapped_pkattnum;
+			Oid			partIndexId;
+
+			partRel = table_open(pd->oids[i], ShareRowExclusiveLock);
+
+			/*
+			 * Map the attribute numbers in the referenced side of the FK
+			 * definition to match the partition's column layout.
+			 */
+			map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
+													RelationGetDescr(pkrel),
+													gettext_noop("could not convert row type"));
+			if (map)
+			{
+				mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks);
+				for (int j = 0; j < numfks; j++)
+					mapped_pkattnum[j] = map[pkattnum[j] - 1];
+			}
+			else
+				mapped_pkattnum = pkattnum;
+
+			/* do the deed */
+			partIndexId = index_get_partition(partRel, indexOid);
+			if (!OidIsValid(partIndexId))
+				elog(ERROR, "index for %u not found in partition %s",
+					 indexOid, RelationGetRelationName(partRel));
+			addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel,
+								   partIndexId, constrOid, numfks,
+								   mapped_pkattnum, fkattnum,
+								   pfeqoperators, ppeqoperators, ffeqoperators,
+								   old_check_ok);
+
+			/* Done -- clean up (but keep the lock) */
+			table_close(partRel, NoLock);
+			if (map)
+			{
+				pfree(mapped_pkattnum);
+				pfree(map);
+			}
+		}
+	}
+
+	return address;
+}
+
+/*
+ * addFkRecurseReferencing
+ *		subroutine for ATAddForeignKeyConstraint and CloneFkReferencing
+ *
+ * If the referencing relation is a plain relation, create the necessary check
+ * triggers that implement the constraint, and set up for Phase 3 constraint
+ * verification.  If the referencing relation is a partitioned table, then
+ * we create a pg_constraint row for it and recurse on this routine for each
+ * partition.
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the referencing relation; might be a partition, if recursing.
+ * pkrel is the root referenced relation.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstr is the OID of the parent constraint (there is always one).
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ *		was already validated (thus this one doesn't need validation).
+ * lockmode is the lockmode to acquire on partitions when recursing.
+ */
+static void
+addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
+						Relation pkrel, Oid indexOid, Oid parentConstr,
+						int numfks, int16 *pkattnum, int16 *fkattnum,
+						Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+						bool old_check_ok, LOCKMODE lockmode)
+{
+	AssertArg(OidIsValid(parentConstr));
+
+	if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("foreign keys constraints are not supported on foreign tables")));
+
+	/*
+	 * If the referencing relation is a plain table, add the check triggers to
+	 * it and, if necessary, schedule it to be checked in Phase 3.
+	 *
+	 * If the relation is partitioned, drill down to do it to its partitions.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_RELATION)
+	{
+		createForeignKeyCheckTriggers(RelationGetRelid(rel),
+									  RelationGetRelid(pkrel),
+									  fkconstraint,
+									  parentConstr,
+									  indexOid);
+
+		/*
+		 * Tell Phase 3 to check that the constraint is satisfied by existing
+		 * rows. We can skip this during table creation, when requested
+		 * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command,
+		 * and when we're recreating a constraint following a SET DATA TYPE
+		 * operation that did not impugn its validity.
+		 */
+		if (wqueue && !old_check_ok && !fkconstraint->skip_validation)
+		{
+			NewConstraint *newcon;
+			AlteredTableInfo *tab;
+
+			tab = ATGetQueueEntry(wqueue, rel);
+
+			newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+			newcon->name = get_constraint_name(parentConstr);
+			newcon->contype = CONSTR_FOREIGN;
+			newcon->refrelid = RelationGetRelid(pkrel);
+			newcon->refindid = indexOid;
+			newcon->conid = parentConstr;
+			newcon->qual = (Node *) fkconstraint;
+
+			tab->constraints = lappend(tab->constraints, newcon);
+		}
+	}
+	else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		PartitionDesc pd = RelationGetPartitionDesc(rel);
+
+		/*
+		 * Recurse to take appropriate action on each partition; either we
+		 * find an existing constraint to reparent to ours, or we create a new
+		 * one.
+		 */
+		for (int i = 0; i < pd->nparts; i++)
+		{
+			Oid			partitionId = pd->oids[i];
 			Relation	partition = table_open(partitionId, lockmode);
+			List	   *partFKs;
+			AttrNumber *attmap;
+			AttrNumber	mapped_fkattnum[INDEX_MAX_KEYS];
+			bool		attached;
+			Oid			partIndexOid;
+			char	   *conname;
+			Oid			constrOid;
+			ObjectAddress address,
+						referenced;
+			ListCell   *cell;
 
 			CheckTableNotInUse(partition, "ALTER TABLE");
 
-			CloneFkReferencing(pg_constraint, rel, partition,
-							   list_make1_oid(constrOid),
-							   &cloned);
+			attmap = convert_tuples_by_name_map(RelationGetDescr(partition),
+												RelationGetDescr(rel),
+												gettext_noop("could not convert row type"));
+			for (int j = 0; j < numfks; j++)
+				mapped_fkattnum[j] = attmap[fkattnum[j] - 1];
+
+			partFKs = copyObject(RelationGetFKeyList(partition));
+			attached = false;
+			foreach(cell, partFKs)
+			{
+				ForeignKeyCacheInfo *fk;
+
+				fk = lfirst_node(ForeignKeyCacheInfo, cell);
+				if (tryAttachPartitionForeignKey(fk,
+												 partitionId,
+												 parentConstr,
+												 numfks,
+												 mapped_fkattnum,
+												 pkattnum,
+												 pfeqoperators))
+				{
+					attached = true;
+					break;
+				}
+			}
+			if (attached)
+			{
+				table_close(partition, NoLock);
+				continue;
+			}
+
+			/*
+			 * No luck finding a good constraint to reuse; create our own.
+			 */
+			partIndexOid = index_get_partition(partition, indexOid);
+			if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+									 RelationGetRelid(partition),
+									 fkconstraint->conname))
+				conname = ChooseConstraintName(RelationGetRelationName(partition),
+											   ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+											   "fkey",
+											   RelationGetNamespace(partition), NIL);
+			else
+				conname = fkconstraint->conname;
+			constrOid =
+				CreateConstraintEntry(conname,
+									  RelationGetNamespace(partition),
+									  CONSTRAINT_FOREIGN,
+									  fkconstraint->deferrable,
+									  fkconstraint->initdeferred,
+									  fkconstraint->initially_valid,
+									  parentConstr,
+									  partitionId,
+									  mapped_fkattnum,
+									  numfks,
+									  numfks,
+									  InvalidOid,
+									  partIndexOid,
+									  RelationGetRelid(pkrel),
+									  pkattnum,
+									  pfeqoperators,
+									  ppeqoperators,
+									  ffeqoperators,
+									  numfks,
+									  fkconstraint->fk_upd_action,
+									  fkconstraint->fk_del_action,
+									  fkconstraint->fk_matchtype,
+									  NULL,
+									  NULL,
+									  NULL,
+									  false,
+									  1,
+									  false,
+									  false);
+
+			/*
+			 * Give this constraint partition-type dependencies on the parent
+			 * constraint as well as the table.
+			 */
+			ObjectAddressSet(address, ConstraintRelationId, constrOid);
+			ObjectAddressSet(referenced, ConstraintRelationId, parentConstr);
+			recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
+			ObjectAddressSet(referenced, RelationRelationId, partitionId);
+			recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
+
+			/* Make all this visible before recursing */
+			CommandCounterIncrement();
+
+			/* call ourselves to finalize the creation and we're done */
+			addFkRecurseReferencing(wqueue, fkconstraint, partition, pkrel,
+									indexOid,
+									constrOid,
+									numfks,
+									pkattnum,
+									mapped_fkattnum,
+									pfeqoperators,
+									ppeqoperators,
+									ffeqoperators,
+									old_check_ok,
+									lockmode);
 
 			table_close(partition, NoLock);
 		}
-		table_close(pg_constraint, RowExclusiveLock);
-
-		foreach(cell, cloned)
-		{
-			ClonedConstraint *cc = (ClonedConstraint *) lfirst(cell);
-			Relation    partition = table_open(cc->relid, lockmode);
-			AlteredTableInfo *childtab;
-			NewConstraint *newcon;
-
-			/* Find or create work queue entry for this partition */
-			childtab = ATGetQueueEntry(wqueue, partition);
-
-			newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-			newcon->name = cc->constraint->conname;
-			newcon->contype = CONSTR_FOREIGN;
-			newcon->refrelid = cc->refrelid;
-			newcon->refindid = cc->conindid;
-			newcon->conid = cc->conid;
-			newcon->qual = (Node *) fkconstraint;
-
-			childtab->constraints = lappend(childtab->constraints, newcon);
-
-			table_close(partition, lockmode);
-		}
 	}
-
-	/*
-	 * Close pk table, but keep lock until we've committed.
-	 */
-	table_close(pkrel, NoLock);
-
-	return address;
 }
 
 /*
@@ -7891,74 +8228,230 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  * relationId is a partition of parentId, so we can be certain that it has the
  * same columns with the same datatypes.  The columns may be in different
  * order, though.
- *
- * The *cloned list is appended ClonedConstraint elements describing what was
- * created, for the purposes of validating the constraint in ALTER TABLE's
- * Phase 3.
  */
 static void
-CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+CloneForeignKeyConstraints(List **wqueue, Oid parentId, Oid relationId)
 {
 	Relation	pg_constraint;
 	Relation	parentRel;
 	Relation	rel;
-	ScanKeyData key;
-	SysScanDesc scan;
-	HeapTuple	tuple;
 	List	   *clone = NIL;
+	ListCell   *cell;
 
 	parentRel = table_open(parentId, NoLock);	/* already got lock */
+	/* This only works for declarative partitioning */
+	Assert(parentRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+
 	/* see ATAddForeignKeyConstraint about lock level */
 	rel = table_open(relationId, AccessExclusiveLock);
 	pg_constraint = table_open(ConstraintRelationId, RowShareLock);
 
-	/* Obtain the list of constraints to clone or attach */
-	ScanKeyInit(&key,
-				Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
-				F_OIDEQ, ObjectIdGetDatum(parentId));
-	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
-							  NULL, 1, &key);
+	/*
+	 * Clone constraints for which the parent is on the referenced side.
+	 */
+	CloneFkReferenced(parentRel, rel, pg_constraint);
+
+	/* done with pg_constraint */
+	table_close(pg_constraint, RowShareLock);
+
+	/*
+	 * Now clone constraints where the parent is on the referencing side. We
+	 * first obtain a list of such constraints.
+	 */
+	foreach(cell, RelationGetFKeyList(parentRel))
+	{
+		ForeignKeyCacheInfo *fk = lfirst(cell);
+
+		clone = lappend_oid(clone, fk->conoid);
+	}
+	CloneFkReferencing(wqueue, parentRel, rel, clone);
+	list_free(clone);
+
+	/* We're done.  Clean up, keeping locks till commit */
+	table_close(parentRel, NoLock);
+	table_close(rel, NoLock);
+}
+
+/*
+ * CloneFkReferenced
+ *		Subroutine for CloneForeignKeyConstraints
+ *
+ * Find all the FKs that have the parent relation on the referenced side;
+ * clone those constraints to the given partition.  This is to be called
+ * when the partition is being created or attached.
+ *
+ * This recurses to partitions, if the relation being attached is partitioned.
+ * Recursion is done by calling addFkRecurseReferenced.
+ */
+static void
+CloneFkReferenced(Relation parentRel, Relation partitionRel,
+				  Relation pg_constraint)
+{
+	AttrNumber *attmap;
+	ListCell   *cell;
+	SysScanDesc scan;
+	ScanKeyData key[2];
+	HeapTuple	tuple;
+	List	   *clone = NIL;
+
+	/*
+	 * Search for any constraints where this partition is in the referenced
+	 * side.  However, we must ignore any constraint whose parent constraint
+	 * is also going to be cloned, to avoid duplicates.  So do it in two
+	 * steps: first construct the list of constraints to clone, then go over
+	 * that list cloning those whose parents are not in the list.  (We must
+	 * not rely on the parent being seen first, since catalog order could
+	 * return children first.)
+	 */
+	attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel),
+										RelationGetDescr(parentRel),
+										gettext_noop("could not convert row type"));
+	ScanKeyInit(&key[0],
+				Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel)));
+	ScanKeyInit(&key[1],
+				Anum_pg_constraint_contype, BTEqualStrategyNumber,
+				F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+	/* This is a seqscan, as we don't have a usable index ... */
+	scan = systable_beginscan(pg_constraint, InvalidOid, true,
+							  NULL, 2, key);
 	while ((tuple = systable_getnext(scan)) != NULL)
 	{
-		Oid		oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+		Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
 
-		clone = lappend_oid(clone, oid);
+		/* ignore this constraint if the parent is already on the list */
+		if (list_member_oid(clone, constrForm->conparentid))
+			continue;
+
+		clone = lappend_oid(clone, constrForm->oid);
 	}
 	systable_endscan(scan);
 
-	/* Do the actual work, recursing to partitions as needed */
-	CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
+	foreach(cell, clone)
+	{
+		Oid			constrOid = lfirst_oid(cell);
+		Form_pg_constraint constrForm;
+		Relation	fkRel;
+		Oid			indexOid;
+		Oid			partIndexId;
+		int			numfks;
+		AttrNumber	conkey[INDEX_MAX_KEYS];
+		AttrNumber	mapped_confkey[INDEX_MAX_KEYS];
+		AttrNumber	confkey[INDEX_MAX_KEYS];
+		Oid			conpfeqop[INDEX_MAX_KEYS];
+		Oid			conppeqop[INDEX_MAX_KEYS];
+		Oid			conffeqop[INDEX_MAX_KEYS];
+		Constraint *fkconstraint;
 
-	/* We're done.  Clean up */
-	table_close(parentRel, NoLock);
-	table_close(rel, NoLock);	/* keep lock till commit */
-	table_close(pg_constraint, RowShareLock);
+		tuple = SearchSysCache1(CONSTROID, constrOid);
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		/* skip children whose parents are going to be cloned, as above */
+		if (list_member_oid(clone, constrForm->conparentid))
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
+
+		/*
+		 * Because we're only expanding the key space at the referenced side,
+		 * we don't need to prevent any operation in the referencing table, so
+		 * AccessShareLock suffices (assumes that dropping the constraint
+		 * acquires AEL).
+		 */
+		fkRel = table_open(constrForm->conrelid, AccessShareLock);
+
+		indexOid = constrForm->conindid;
+		DeconstructFkConstraintRow(tuple,
+								   &numfks,
+								   conkey,
+								   confkey,
+								   conpfeqop,
+								   conppeqop,
+								   conffeqop);
+		for (int i = 0; i < numfks; i++)
+			mapped_confkey[i] = attmap[confkey[i] - 1];
+
+		fkconstraint = makeNode(Constraint);
+		/* for now this is all we need */
+		fkconstraint->conname = NameStr(constrForm->conname);
+		fkconstraint->fk_upd_action = constrForm->confupdtype;
+		fkconstraint->fk_del_action = constrForm->confdeltype;
+		fkconstraint->deferrable = constrForm->condeferrable;
+		fkconstraint->initdeferred = constrForm->condeferred;
+		fkconstraint->initially_valid = true;
+		fkconstraint->fk_matchtype = constrForm->confmatchtype;
+
+		/* set up colnames that are used to generate the constraint name */
+		for (int i = 0; i < numfks; i++)
+		{
+			Form_pg_attribute att;
+
+			att = TupleDescAttr(RelationGetDescr(fkRel),
+								conkey[i] - 1);
+			fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
+											 makeString(NameStr(att->attname)));
+		}
+
+		/*
+		 * Add the new foreign key constraint pointing to the new partition.
+		 * Because this new partition appears in the referenced side of the
+		 * constraint, we don't need to set up for Phase 3 check.
+		 */
+		partIndexId = index_get_partition(partitionRel, indexOid);
+		if (!OidIsValid(partIndexId))
+			elog(ERROR, "index for %u not found in partition %s",
+				 indexOid, RelationGetRelationName(partitionRel));
+		addFkRecurseReferenced(NULL,
+							   fkconstraint,
+							   fkRel,
+							   partitionRel,
+							   partIndexId,
+							   constrOid,
+							   numfks,
+							   mapped_confkey,
+							   conkey,
+							   conpfeqop,
+							   conppeqop,
+							   conffeqop,
+							   true);
+
+		table_close(fkRel, NoLock);
+		ReleaseSysCache(tuple);
+	}
 }
 
 /*
  * CloneFkReferencing
- *		Recursive subroutine for CloneForeignKeyConstraints, referencing side
+ *		Subroutine for CloneForeignKeyConstraints
  *
- * Clone the given list of FK constraints when a partition is attached on the
- * referencing side of those constraints.
+ * For each FK constraint of the parent relation in the given list, find an
+ * equivalent constraint in its partition relation that can be reparented;
+ * if one cannot be found, create a new constraint in the partition as its
+ * child.
  *
- * When cloning foreign keys to a partition, it may happen that equivalent
- * constraints already exist in the partition for some of them.  We can skip
- * creating a clone in that case, and instead just attach the existing
- * constraint to the one in the parent.
- *
- * This function recurses to partitions, if the new partition is partitioned;
- * of course, only do this for FKs that were actually cloned.
+ * If wqueue is given, it is used to set up phase-3 verification for each
+ * cloned constraint; if omitted, we assume that such verification is not
+ * needed (example: the partition is being created anew).
  */
 static void
-CloneFkReferencing(Relation pg_constraint, Relation parentRel,
-				   Relation partRel, List *clone, List **cloned)
+CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel,
+				   List *clone)
 {
 	AttrNumber *attmap;
 	List	   *partFKs;
-	List	   *subclone = NIL;
 	ListCell   *cell;
 
+	if (clone == NIL)
+		return;
+
+	if (partRel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("foreign keys constraints are not supported on foreign tables")));
+
 	/*
 	 * The constraint key may differ, if the columns in the partition are
 	 * different.  This map is used to convert them.
@@ -7973,6 +8466,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 	{
 		Oid			parentConstrOid = lfirst_oid(cell);
 		Form_pg_constraint constrForm;
+		Relation	pkrel;
 		HeapTuple	tuple;
 		int			numfks;
 		AttrNumber	conkey[INDEX_MAX_KEYS];
@@ -7982,13 +8476,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 		Oid			conppeqop[INDEX_MAX_KEYS];
 		Oid			conffeqop[INDEX_MAX_KEYS];
 		Constraint *fkconstraint;
-		bool		attach_it;
+		bool		attached;
+		Oid			indexOid;
 		Oid			constrOid;
-		ObjectAddress parentAddr,
-					childAddr,
-					childTableAddr;
+		ObjectAddress address,
+					referenced;
 		ListCell   *cell;
-		int			i;
 
 		tuple = SearchSysCache1(CONSTROID, parentConstrOid);
 		if (!tuple)
@@ -7996,142 +8489,46 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 				 parentConstrOid);
 		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
 
-		/* only foreign keys */
-		if (constrForm->contype != CONSTRAINT_FOREIGN)
+		/* Don't clone constraints whose parents are being cloned */
+		if (list_member_oid(clone, constrForm->conparentid))
 		{
 			ReleaseSysCache(tuple);
 			continue;
 		}
 
-		ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
+		/* XXX lock level needs to prevent concurrent deletes */
+		pkrel = table_open(constrForm->confrelid, AccessShareLock);
 
 		DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
 								   conpfeqop, conppeqop, conffeqop);
-		for (i = 0; i < numfks; i++)
+		for (int i = 0; i < numfks; i++)
 			mapped_conkey[i] = attmap[conkey[i] - 1];
 
 		/*
 		 * Before creating a new constraint, see whether any existing FKs are
-		 * fit for the purpose.  If one is, attach the parent constraint to it,
-		 * and don't clone anything.  This way we avoid the expensive
+		 * fit for the purpose.  If one is, attach the parent constraint to
+		 * it, and don't clone anything.  This way we avoid the expensive
 		 * verification step and don't end up with a duplicate FK.  This also
 		 * means we don't consider this constraint when recursing to
 		 * partitions.
 		 */
-		attach_it = false;
+		attached = false;
 		foreach(cell, partFKs)
 		{
 			ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
-			Form_pg_constraint partConstr;
-			HeapTuple	partcontup;
-			Relation	trigrel;
-			HeapTuple	trigtup;
-			SysScanDesc scan;
-			ScanKeyData key;
 
-			attach_it = true;
-
-			/*
-			 * Do some quick & easy initial checks.  If any of these fail, we
-			 * cannot use this constraint, but keep looking.
-			 */
-			if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
+			if (tryAttachPartitionForeignKey(fk,
+											 RelationGetRelid(partRel),
+											 parentConstrOid,
+											 numfks,
+											 mapped_conkey,
+											 confkey,
+											 conpfeqop))
 			{
-				attach_it = false;
-				continue;
+				attached = true;
+				table_close(pkrel, NoLock);
+				break;
 			}
-			for (i = 0; i < numfks; i++)
-			{
-				if (fk->conkey[i] != mapped_conkey[i] ||
-					fk->confkey[i] != confkey[i] ||
-					fk->conpfeqop[i] != conpfeqop[i])
-				{
-					attach_it = false;
-					break;
-				}
-			}
-			if (!attach_it)
-				continue;
-
-			/*
-			 * Looks good so far; do some more extensive checks.  Presumably
-			 * the check for 'convalidated' could be dropped, since we don't
-			 * really care about that, but let's be careful for now.
-			 */
-			partcontup = SearchSysCache1(CONSTROID,
-										 ObjectIdGetDatum(fk->conoid));
-			if (!partcontup)
-				elog(ERROR, "cache lookup failed for constraint %u",
-					 fk->conoid);
-			partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
-			if (OidIsValid(partConstr->conparentid) ||
-				!partConstr->convalidated ||
-				partConstr->condeferrable != constrForm->condeferrable ||
-				partConstr->condeferred != constrForm->condeferred ||
-				partConstr->confupdtype != constrForm->confupdtype ||
-				partConstr->confdeltype != constrForm->confdeltype ||
-				partConstr->confmatchtype != constrForm->confmatchtype)
-			{
-				ReleaseSysCache(partcontup);
-				attach_it = false;
-				continue;
-			}
-
-			ReleaseSysCache(partcontup);
-
-			/*
-			 * Looks good!  Attach this constraint.  The action triggers in
-			 * the new partition become redundant -- the parent table already
-			 * has equivalent ones, and those will be able to reach the
-			 * partition.  Remove the ones in the partition.  We identify them
-			 * because they have our constraint OID, as well as being on the
-			 * referenced rel.
-			 */
-			trigrel = heap_open(TriggerRelationId, RowExclusiveLock);
-			ScanKeyInit(&key,
-						Anum_pg_trigger_tgconstraint,
-						BTEqualStrategyNumber, F_OIDEQ,
-						ObjectIdGetDatum(fk->conoid));
-
-			scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
-									  NULL, 1, &key);
-			while ((trigtup = systable_getnext(scan)) != NULL)
-			{
-				Form_pg_trigger	trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
-				ObjectAddress	trigger;
-
-				if (trgform->tgconstrrelid != fk->conrelid)
-					continue;
-				if (trgform->tgrelid != fk->confrelid)
-					continue;
-
-				/*
-				 * The constraint is originally set up to contain this trigger
-				 * as an implementation object, so there's a dependency record
-				 * that links the two; however, since the trigger is no longer
-				 * needed, we remove the dependency link in order to be able
-				 * to drop the trigger while keeping the constraint intact.
-				 */
-				deleteDependencyRecordsFor(TriggerRelationId,
-										   trgform->oid,
-										   false);
-				/* make dependency deletion visible to performDeletion */
-				CommandCounterIncrement();
-				ObjectAddressSet(trigger, TriggerRelationId,
-								 trgform->oid);
-				performDeletion(&trigger, DROP_RESTRICT, 0);
-				/* make trigger drop visible, in case the loop iterates */
-				CommandCounterIncrement();
-			}
-
-			systable_endscan(scan);
-			table_close(trigrel, RowExclusiveLock);
-
-			ConstraintSetParentConstraint(fk->conoid, parentConstrOid,
-										  RelationGetRelid(partRel));
-			CommandCounterIncrement();
-			attach_it = true;
-			break;
 		}
 
 		/*
@@ -8139,18 +8536,46 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 		 * create a new one.  In fact, there's no need to recurse for this
 		 * constraint to partitions, either.
 		 */
-		if (attach_it)
+		if (attached)
 		{
 			ReleaseSysCache(tuple);
 			continue;
 		}
 
+		indexOid = constrForm->conindid;
+
+		fkconstraint = makeNode(Constraint);
+		if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+								 RelationGetRelid(partRel),
+								 NameStr(constrForm->conname)))
+			fkconstraint->conname =
+				ChooseConstraintName(RelationGetRelationName(partRel),
+									 ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
+									 "fkey",
+									 RelationGetNamespace(partRel), NIL);
+		else
+			fkconstraint->conname = NameStr(constrForm->conname);
+		fkconstraint->fk_upd_action = constrForm->confupdtype;
+		fkconstraint->fk_del_action = constrForm->confdeltype;
+		fkconstraint->deferrable = constrForm->condeferrable;
+		fkconstraint->initdeferred = constrForm->condeferred;
+		fkconstraint->fk_matchtype = constrForm->confmatchtype;
+		for (int i = 0; i < numfks; i++)
+		{
+			Form_pg_attribute att;
+
+			att = TupleDescAttr(RelationGetDescr(partRel),
+								mapped_conkey[i] - 1);
+			fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
+											 makeString(NameStr(att->attname)));
+		}
+
 		constrOid =
-			CreateConstraintEntry(NameStr(constrForm->conname),
+			CreateConstraintEntry(fkconstraint->conname,
 								  constrForm->connamespace,
 								  CONSTRAINT_FOREIGN,
-								  constrForm->condeferrable,
-								  constrForm->condeferred,
+								  fkconstraint->deferrable,
+								  fkconstraint->initdeferred,
 								  constrForm->convalidated,
 								  parentConstrOid,
 								  RelationGetRelid(partRel),
@@ -8158,92 +8583,191 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 								  numfks,
 								  numfks,
 								  InvalidOid,	/* not a domain constraint */
-								  constrForm->conindid, /* same index */
+								  indexOid,
 								  constrForm->confrelid,	/* same foreign rel */
 								  confkey,
 								  conpfeqop,
 								  conppeqop,
 								  conffeqop,
 								  numfks,
-								  constrForm->confupdtype,
-								  constrForm->confdeltype,
-								  constrForm->confmatchtype,
+								  fkconstraint->fk_upd_action,
+								  fkconstraint->fk_del_action,
+								  fkconstraint->fk_matchtype,
 								  NULL,
 								  NULL,
 								  NULL,
-								  false,
-								  1, false, true);
-		subclone = lappend_oid(subclone, constrOid);
+								  false,	/* islocal */
+								  1,	/* inhcount */
+								  false,	/* conNoInherit */
+								  true);
 
 		/* Set up partition dependencies for the new constraint */
-		ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
-		recordDependencyOn(&childAddr, &parentAddr,
-						   DEPENDENCY_PARTITION_PRI);
-		ObjectAddressSet(childTableAddr, RelationRelationId,
+		ObjectAddressSet(address, ConstraintRelationId, constrOid);
+		ObjectAddressSet(referenced, ConstraintRelationId, parentConstrOid);
+		recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
+		ObjectAddressSet(referenced, RelationRelationId,
 						 RelationGetRelid(partRel));
-		recordDependencyOn(&childAddr, &childTableAddr,
-						   DEPENDENCY_PARTITION_SEC);
-
-		fkconstraint = makeNode(Constraint);
-		/* for now this is all we need */
-		fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
-		fkconstraint->fk_upd_action = constrForm->confupdtype;
-		fkconstraint->fk_del_action = constrForm->confdeltype;
-		fkconstraint->deferrable = constrForm->condeferrable;
-		fkconstraint->initdeferred = constrForm->condeferred;
-
-		createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
-								 constrOid, constrForm->conindid, false);
-
-		if (cloned)
-		{
-			ClonedConstraint *newc;
-
-			/*
-			 * Feed back caller about the constraints we created, so that they
-			 * can set up constraint verification.
-			 */
-			newc = palloc(sizeof(ClonedConstraint));
-			newc->relid = RelationGetRelid(partRel);
-			newc->refrelid = constrForm->confrelid;
-			newc->conindid = constrForm->conindid;
-			newc->conid = constrOid;
-			newc->constraint = fkconstraint;
-
-			*cloned = lappend(*cloned, newc);
-		}
+		recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
 
+		/* Done with the cloned constraint's tuple */
 		ReleaseSysCache(tuple);
-	}
 
-	pfree(attmap);
-	list_free_deep(partFKs);
+		/* Make all this visible before recursing */
+		CommandCounterIncrement();
 
-	/*
-	 * If the partition is partitioned, recurse to handle any constraints that
-	 * were cloned.
-	 */
-	if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
-		subclone != NIL)
-	{
-		PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
-		int			i;
-
-		for (i = 0; i < partdesc->nparts; i++)
-		{
-			Relation	childRel;
-
-			childRel = table_open(partdesc->oids[i], AccessExclusiveLock);
-			CloneFkReferencing(pg_constraint,
-							   partRel,
-							   childRel,
-							   subclone,
-							   cloned);
-			table_close(childRel, NoLock);	/* keep lock till commit */
-		}
+		addFkRecurseReferencing(wqueue,
+								fkconstraint,
+								partRel,
+								pkrel,
+								indexOid,
+								constrOid,
+								numfks,
+								confkey,
+								mapped_conkey,
+								conpfeqop,
+								conppeqop,
+								conffeqop,
+								false,	/* no old check exists */
+								AccessExclusiveLock);
+		table_close(pkrel, NoLock);
 	}
 }
 
+/*
+ * When the parent of a partition receives [the referencing side of] a foreign
+ * key, we must propagate that foreign key to the partition.  However, the
+ * partition might already have an equivalent foreign key; this routine
+ * compares the given ForeignKeyCacheInfo (in the partition) to the FK defined
+ * by the other parameters.  If they are equivalent, create the link between
+ * the two constraints and return true.
+ *
+ * If the given FK does not match the one defined by rest of the params,
+ * return false.
+ */
+static bool
+tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+							 Oid partRelid,
+							 Oid parentConstrOid,
+							 int numfks,
+							 AttrNumber *mapped_conkey,
+							 AttrNumber *confkey,
+							 Oid *conpfeqop)
+{
+	HeapTuple	parentConstrTup;
+	Form_pg_constraint parentConstr;
+	HeapTuple	partcontup;
+	Form_pg_constraint partConstr;
+	Relation	trigrel;
+	ScanKeyData key;
+	SysScanDesc scan;
+	HeapTuple	trigtup;
+
+	parentConstrTup = SearchSysCache1(CONSTROID,
+									  ObjectIdGetDatum(parentConstrOid));
+	if (!parentConstrTup)
+		elog(ERROR, "cache lookup failed for constraint %u", parentConstrOid);
+	parentConstr = (Form_pg_constraint) GETSTRUCT(parentConstrTup);
+
+	/*
+	 * Do some quick & easy initial checks.  If any of these fail, we cannot
+	 * use this constraint.
+	 */
+	if (fk->confrelid != parentConstr->confrelid || fk->nkeys != numfks)
+	{
+		ReleaseSysCache(parentConstrTup);
+		return false;
+	}
+	for (int i = 0; i < numfks; i++)
+	{
+		if (fk->conkey[i] != mapped_conkey[i] ||
+			fk->confkey[i] != confkey[i] ||
+			fk->conpfeqop[i] != conpfeqop[i])
+		{
+			ReleaseSysCache(parentConstrTup);
+			return false;
+		}
+	}
+
+	/*
+	 * Looks good so far; do some more extensive checks.  Presumably the check
+	 * for 'convalidated' could be dropped, since we don't really care about
+	 * that, but let's be careful for now.
+	 */
+	partcontup = SearchSysCache1(CONSTROID,
+								 ObjectIdGetDatum(fk->conoid));
+	if (!partcontup)
+		elog(ERROR, "cache lookup failed for constraint %u",
+			 fk->conoid);
+	partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+	if (OidIsValid(partConstr->conparentid) ||
+		!partConstr->convalidated ||
+		partConstr->condeferrable != parentConstr->condeferrable ||
+		partConstr->condeferred != parentConstr->condeferred ||
+		partConstr->confupdtype != parentConstr->confupdtype ||
+		partConstr->confdeltype != parentConstr->confdeltype ||
+		partConstr->confmatchtype != parentConstr->confmatchtype)
+	{
+		ReleaseSysCache(parentConstrTup);
+		ReleaseSysCache(partcontup);
+		return false;
+	}
+
+	ReleaseSysCache(partcontup);
+	ReleaseSysCache(parentConstrTup);
+
+	/*
+	 * Looks good!  Attach this constraint.  The action triggers in the new
+	 * partition become redundant -- the parent table already has equivalent
+	 * ones, and those will be able to reach the partition.  Remove the ones
+	 * in the partition.  We identify them because they have our constraint
+	 * OID, as well as being on the referenced rel.
+	 */
+	trigrel = table_open(TriggerRelationId, RowExclusiveLock);
+	ScanKeyInit(&key,
+				Anum_pg_trigger_tgconstraint,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(fk->conoid));
+
+	scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
+							  NULL, 1, &key);
+	while ((trigtup = systable_getnext(scan)) != NULL)
+	{
+		Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
+		ObjectAddress trigger;
+
+		if (trgform->tgconstrrelid != fk->conrelid)
+			continue;
+		if (trgform->tgrelid != fk->confrelid)
+			continue;
+
+		/*
+		 * The constraint is originally set up to contain this trigger as an
+		 * implementation object, so there's a dependency record that links
+		 * the two; however, since the trigger is no longer needed, we remove
+		 * the dependency link in order to be able to drop the trigger while
+		 * keeping the constraint intact.
+		 */
+		deleteDependencyRecordsFor(TriggerRelationId,
+								   trgform->oid,
+								   false);
+		/* make dependency deletion visible to performDeletion */
+		CommandCounterIncrement();
+		ObjectAddressSet(trigger, TriggerRelationId,
+						 trgform->oid);
+		performDeletion(&trigger, DROP_RESTRICT, 0);
+		/* make trigger drop visible, in case the loop iterates */
+		CommandCounterIncrement();
+	}
+
+	systable_endscan(scan);
+	table_close(trigrel, RowExclusiveLock);
+
+	ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid);
+	CommandCounterIncrement();
+	return true;
+}
+
+
 /*
  * ALTER TABLE ALTER CONSTRAINT
  *
@@ -8363,8 +8887,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
 			/*
 			 * Update deferrability of RI_FKey_noaction_del,
 			 * RI_FKey_noaction_upd, RI_FKey_check_ins and RI_FKey_check_upd
-			 * triggers, but not others; see createForeignKeyTriggers and
-			 * CreateFKCheckTrigger.
+			 * triggers, but not others; see createForeignKeyActionTriggers
+			 * and CreateFKCheckTrigger.
 			 */
 			if (tgform->tgfoid != F_RI_FKEY_NOACTION_DEL &&
 				tgform->tgfoid != F_RI_FKEY_NOACTION_UPD &&
@@ -9271,37 +9795,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
 						 indexOid, false);
 }
 
-/*
- * Create the triggers that implement an FK constraint.
- *
- * NB: if you change any trigger properties here, see also
- * ATExecAlterConstraint.
- */
-void
-createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
-						 Oid constraintOid, Oid indexOid, bool create_action)
-{
-	/*
-	 * For the referenced side, create action triggers, if requested.  (If the
-	 * referencing side is partitioned, there is still only one trigger, which
-	 * runs on the referenced side and points to the top of the referencing
-	 * hierarchy.)
-	 */
-	if (create_action)
-		createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
-									   indexOid);
-
-	/*
-	 * For the referencing side, create the check triggers.  We only need
-	 * these on the partitions.
-	 */
-	if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
-									  fkconstraint, constraintOid, indexOid);
-
-	CommandCounterIncrement();
-}
-
 /*
  * ALTER TABLE DROP CONSTRAINT
  *
@@ -14703,8 +15196,6 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
 	bool		found_whole_row;
 	Oid			defaultPartOid;
 	List	   *partBoundConstraint;
-	List	   *cloned;
-	ListCell   *l;
 
 	/*
 	 * We must lock the default partition if one exists, because attaching a
@@ -14885,33 +15376,12 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
 	CloneRowTriggersToPartition(rel, attachrel);
 
 	/*
-	 * Clone foreign key constraints, and setup for Phase 3 to verify them.
+	 * Clone foreign key constraints.  Callee is responsible for setting up
+	 * for phase 3 constraint verification.
 	 */
-	cloned = NIL;
-	CloneForeignKeyConstraints(RelationGetRelid(rel),
-							   RelationGetRelid(attachrel), &cloned);
-	foreach(l, cloned)
-	{
-		ClonedConstraint *clonedcon = lfirst(l);
-		NewConstraint *newcon;
-		Relation	clonedrel;
-		AlteredTableInfo *parttab;
-
-		clonedrel = relation_open(clonedcon->relid, NoLock);
-		parttab = ATGetQueueEntry(wqueue, clonedrel);
-
-		newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-		newcon->name = clonedcon->constraint->conname;
-		newcon->contype = CONSTR_FOREIGN;
-		newcon->refrelid = clonedcon->refrelid;
-		newcon->refindid = clonedcon->conindid;
-		newcon->conid = clonedcon->conid;
-		newcon->qual = (Node *) clonedcon->constraint;
-
-		parttab->constraints = lappend(parttab->constraints, newcon);
-
-		relation_close(clonedrel, NoLock);
-	}
+	CloneForeignKeyConstraints(wqueue,
+							   RelationGetRelid(rel),
+							   RelationGetRelid(attachrel));
 
 	/*
 	 * Generate partition constraint from the partition bound specification.
@@ -15102,6 +15572,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
 												  RelationGetRelid(attachrel));
 				update_relispartition(NULL, cldIdxId, true);
 				found = true;
+
+				CommandCounterIncrement();
 				break;
 			}
 		}
@@ -15280,6 +15752,7 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
 	Oid			defaultPartOid;
 	List	   *indexes;
 	List	   *fks;
+	List	   *refconstraints;
 	ListCell   *cell;
 
 	/*
@@ -15293,6 +15766,10 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
 
 	partRel = table_openrv(name, ShareUpdateExclusiveLock);
 
+	/* Ensure that foreign keys still hold after this detach */
+	refconstraints = GetParentedForeignKeyRefs(partRel);
+	CheckNoForeignKeyRefs(partRel, refconstraints, false);
+
 	/* All inheritance related checks are performed within the function */
 	RemoveInheritance(partRel, rel);
 
@@ -15411,6 +15888,24 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
 	}
 	list_free_deep(fks);
 
+	/*
+	 * Any sub-constrains that are in the referenced-side of a larger
+	 * constraint have to be removed.  This partition is no longer part of the
+	 * key space of the constraint.
+	 */
+	foreach(cell, refconstraints)
+	{
+		Oid			constrOid = lfirst_oid(cell);
+		ObjectAddress constraint;
+
+		ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid);
+		CommandCounterIncrement();
+
+		ObjectAddressSet(constraint, ConstraintRelationId, constrOid);
+		performDeletion(&constraint, DROP_RESTRICT, 0);
+	}
+	CommandCounterIncrement();
+
 	/*
 	 * Invalidate the parent's relcache so that the partition is no longer
 	 * included in its partition descriptor.
@@ -15791,3 +16286,118 @@ update_relispartition(Relation classRel, Oid relationId, bool newval)
 	if (opened)
 		table_close(classRel, RowExclusiveLock);
 }
+
+/*
+ * Return an OID list of constraints that reference the given relation
+ * that are marked as having a parent constraints.
+ */
+List *
+GetParentedForeignKeyRefs(Relation partition)
+{
+	Relation	pg_constraint;
+	HeapTuple	tuple;
+	SysScanDesc scan;
+	ScanKeyData key[2];
+	List	   *constraints = NIL;
+
+	/*
+	 * If no indexes, or no columns are referenceable by FKs, we can avoid the
+	 * scan.
+	 */
+	if (RelationGetIndexList(partition) == NIL ||
+		bms_is_empty(RelationGetIndexAttrBitmap(partition,
+												INDEX_ATTR_BITMAP_KEY)))
+		return NIL;
+
+	/* Search for constraints referencing this table */
+	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+	ScanKeyInit(&key[0],
+				Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition)));
+	ScanKeyInit(&key[1],
+				Anum_pg_constraint_contype, BTEqualStrategyNumber,
+				F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+
+	/* XXX This is a seqscan, as we don't have a usable index */
+	scan = systable_beginscan(pg_constraint, InvalidOid, true, NULL, 2, key);
+	while ((tuple = systable_getnext(scan)) != NULL)
+	{
+		Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		/*
+		 * We only need to process constraints that are part of larger ones.
+		 */
+		if (!OidIsValid(constrForm->conparentid))
+			continue;
+
+		constraints = lappend_oid(constraints, constrForm->oid);
+	}
+
+	systable_endscan(scan);
+	table_close(pg_constraint, AccessShareLock);
+
+	return constraints;
+}
+
+/*
+ * During an operation that removes a partition from a partitioned table
+ * (either a DETACH or DROP), verify that any foreign keys pointing to the
+ * partitioned table would not become invalid.  An error raised if any
+ * referenced values exist.
+ *
+ * Returns a list of such constraints.
+ */
+void
+CheckNoForeignKeyRefs(Relation partition, List *constraints, bool isDrop)
+{
+	ListCell   *cell;
+
+	/*
+	 * In the DROP case, we can skip this check when this is a partitioned
+	 * partition, because its partitions will go through this also, and we'd
+	 * run the check twice uselessly.
+	 *
+	 * In the DETACH case, this is only called for the top-level relation, so
+	 * we must run it nevertheless.
+	 */
+	if (isDrop && partition->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return;
+
+	foreach(cell, constraints)
+	{
+		Oid			constrOid = lfirst_oid(cell);
+		HeapTuple	tuple;
+		Form_pg_constraint constrForm;
+		Relation	rel;
+		Trigger		trig;
+
+		tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		Assert(OidIsValid(constrForm->conparentid));
+		Assert(constrForm->confrelid == RelationGetRelid(partition));
+
+		/* prevent data changes into the referencing table until commit */
+		rel = table_open(constrForm->conrelid, ShareLock);
+
+		MemSet(&trig, 0, sizeof(trig));
+		trig.tgoid = InvalidOid;
+		trig.tgname = NameStr(constrForm->conname);
+		trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN;
+		trig.tgisinternal = true;
+		trig.tgconstrrelid = RelationGetRelid(partition);
+		trig.tgconstrindid = constrForm->conindid;
+		trig.tgconstraint = constrForm->oid;
+		trig.tgdeferrable = false;
+		trig.tginitdeferred = false;
+		/* we needn't fill in remaining fields */
+
+		RI_Final_Check(&trig, rel, partition);
+
+		ReleaseSysCache(tuple);
+
+		table_close(rel, NoLock);
+	}
+}
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index d715709b7cd..e7a107419bd 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -49,6 +49,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
+#include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
@@ -219,8 +220,8 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 				 Datum *vals, char *nulls);
 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 				   Relation pk_rel, Relation fk_rel,
-				   TupleTableSlot *violator, TupleDesc tupdesc,
-				   int queryno) pg_attribute_noreturn();
+				   TupleTableSlot *violatorslot, TupleDesc tupdesc,
+				   int queryno, bool partgone) pg_attribute_noreturn();
 
 
 /*
@@ -347,18 +348,22 @@ RI_FKey_check(TriggerData *trigdata)
 		char		paramname[16];
 		const char *querysep;
 		Oid			queryoids[RI_MAX_NUMKEYS];
+		const char *pk_only;
 
 		/* ----------
 		 * The query string built is
-		 *	SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
 		 *		   FOR KEY SHARE OF x
 		 * The type id's for the $ parameters are those of the
 		 * corresponding FK attributes.
 		 * ----------
 		 */
 		initStringInfo(&querybuf);
+		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+			"" : "ONLY ";
 		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+						 pk_only, pkrelname);
 		querysep = "WHERE";
 		for (int i = 0; i < riinfo->nkeys; i++)
 		{
@@ -470,19 +475,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 		char		attname[MAX_QUOTED_NAME_LEN];
 		char		paramname[16];
 		const char *querysep;
+		const char *pk_only;
 		Oid			queryoids[RI_MAX_NUMKEYS];
 
 		/* ----------
 		 * The query string built is
-		 *	SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
 		 *		   FOR KEY SHARE OF x
 		 * The type id's for the $ parameters are those of the
 		 * PK attributes themselves.
 		 * ----------
 		 */
 		initStringInfo(&querybuf);
+		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+			"" : "ONLY ";
 		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+						 pk_only, pkrelname);
 		querysep = "WHERE";
 		for (int i = 0; i < riinfo->nkeys; i++)
 		{
@@ -1276,6 +1285,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	RangeTblEntry *fkrte;
 	const char *sep;
 	const char *fk_only;
+	const char *pk_only;
 	int			save_nestlevel;
 	char		workmembuf[32];
 	int			spi_result;
@@ -1333,7 +1343,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	/*----------
 	 * The query string built is:
 	 *	SELECT fk.keycols FROM [ONLY] relname fk
-	 *	 LEFT OUTER JOIN ONLY pkrelname pk
+	 *	 LEFT OUTER JOIN [ONLY] pkrelname pk
 	 *	 ON (pk.pkkeycol1=fk.keycol1 [AND ...])
 	 *	 WHERE pk.pkkeycol1 IS NULL AND
 	 * For MATCH SIMPLE:
@@ -1360,9 +1370,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	quoteRelationName(fkrelname, fk_rel);
 	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
 		"" : "ONLY ";
+	pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
 	appendStringInfo(&querybuf,
-					 " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON",
-					 fk_only, fkrelname, pkrelname);
+					 " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
+					 fk_only, fkrelname, pk_only, pkrelname);
 
 	strcpy(pkattname, "pk.");
 	strcpy(fkattname, "fk.");
@@ -1513,7 +1525,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 		ri_ReportViolation(&fake_riinfo,
 						   pk_rel, fk_rel,
 						   slot, tupdesc,
-						   RI_PLAN_CHECK_LOOKUPPK);
+						   RI_PLAN_CHECK_LOOKUPPK, false);
 
 		ExecDropSingleTupleTableSlot(slot);
 	}
@@ -1529,6 +1541,195 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	return true;
 }
 
+/* ----------
+ * RI_Final_Check -
+ *
+ */
+void
+RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
+{
+	const RI_ConstraintInfo *riinfo;
+	StringInfoData querybuf;
+	char	   *constraintDef;
+	char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
+	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
+	char		pkattname[MAX_QUOTED_NAME_LEN + 3];
+	char		fkattname[MAX_QUOTED_NAME_LEN + 3];
+	const char *sep;
+	const char *fk_only;
+	int			spi_result;
+	SPIPlanPtr	qplan;
+	int			i;
+
+	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
+
+	/* XXX handle the non-permission case?? */
+
+	/*----------
+	 * The query string built is:
+	 *  SELECT fk.keycols FROM [ONLY] relname fk
+	 *    JOIN pkrelname pk
+	 *    ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+	 *    WHERE (<partition constraint>) AND
+	 * For MATCH SIMPLE:
+	 *   (fk.keycol1 IS NOT NULL [AND ...])
+	 * For MATCH FULL:
+	 *   (fk.keycol1 IS NOT NULL [OR ...])
+	 *
+	 * We attach COLLATE clauses to the operators when comparing columns
+	 * that have different collations.
+	 *----------
+	 */
+	initStringInfo(&querybuf);
+	appendStringInfoString(&querybuf, "SELECT ");
+	sep = "";
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		quoteOneName(fkattname,
+					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
+		sep = ", ";
+	}
+
+	quoteRelationName(pkrelname, pk_rel);
+	quoteRelationName(fkrelname, fk_rel);
+	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
+	appendStringInfo(&querybuf,
+					 " FROM %s%s fk JOIN %s pk ON",
+					 fk_only, fkrelname, pkrelname);
+	strcpy(pkattname, "pk.");
+	strcpy(fkattname, "fk.");
+	sep = "(";
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+		Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+		Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
+		Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
+
+		quoteOneName(pkattname + 3,
+					 RIAttName(pk_rel, riinfo->pk_attnums[i]));
+		quoteOneName(fkattname + 3,
+					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		ri_GenerateQual(&querybuf, sep,
+						pkattname, pk_type,
+						riinfo->pf_eq_oprs[i],
+						fkattname, fk_type);
+		if (pk_coll != fk_coll)
+			ri_GenerateQualCollation(&querybuf, pk_coll);
+		sep = "AND";
+	}
+
+	/*
+	 * Start the WHERE clause with the partition constraint (except if this is
+	 * the default partition and there's no other partition, because the
+	 * partition constraint is the empty string in that case.)
+	 */
+	constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk");
+	if (constraintDef && constraintDef[0] != '\0')
+		appendStringInfo(&querybuf, ") WHERE %s AND (",
+						 constraintDef);
+	else
+		appendStringInfo(&querybuf, ") WHERE (");
+
+	sep = "";
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		appendStringInfo(&querybuf,
+						 "%sfk.%s IS NOT NULL",
+						 sep, fkattname);
+		switch (riinfo->confmatchtype)
+		{
+			case FKCONSTR_MATCH_SIMPLE:
+				sep = " AND ";
+				break;
+			case FKCONSTR_MATCH_FULL:
+				sep = " OR ";
+				break;
+			case FKCONSTR_MATCH_PARTIAL:
+				ereport(ERROR,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("MATCH PARTIAL not yet implemented")));
+				break;
+			default:
+				elog(ERROR, "unrecognized confmatchtype: %d",
+					 riinfo->confmatchtype);
+				break;
+		}
+	}
+	appendStringInfoChar(&querybuf, ')');
+
+	/*
+	 * RI_Initial_Check changes work_mem here.
+	 */
+
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	/*
+	 * Generate the plan.  We don't need to cache it, and there are no
+	 * arguments to the plan.
+	 */
+	qplan = SPI_prepare(querybuf.data, 0, NULL);
+
+	if (qplan == NULL)
+		elog(ERROR, "SPI_prepare returned %s for %s",
+			 SPI_result_code_string(SPI_result), querybuf.data);
+
+	/*
+	 * Run the plan.  For safety we force a current snapshot to be used. (In
+	 * transaction-snapshot mode, this arguably violates transaction isolation
+	 * rules, but we really haven't got much choice.) We don't need to
+	 * register the snapshot, because SPI_execute_snapshot will see to it. We
+	 * need at most one tuple returned, so pass limit = 1.
+	 */
+	spi_result = SPI_execute_snapshot(qplan,
+									  NULL, NULL,
+									  GetLatestSnapshot(),
+									  InvalidSnapshot,
+									  true, false, 1);
+
+	/* Check result */
+	if (spi_result != SPI_OK_SELECT)
+		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
+
+	/* Did we find a tuple that would violate the constraint? */
+	if (SPI_processed > 0)
+	{
+		TupleTableSlot *slot;
+		HeapTuple	tuple = SPI_tuptable->vals[0];
+		TupleDesc	tupdesc = SPI_tuptable->tupdesc;
+		RI_ConstraintInfo fake_riinfo;
+
+		slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+		heap_deform_tuple(tuple, tupdesc,
+						  slot->tts_values, slot->tts_isnull);
+		ExecStoreVirtualTuple(slot);
+
+		/*
+		 * The columns to look at in the result tuple are 1..N, not whatever
+		 * they are in the fk_rel.  Hack up riinfo so that ri_ReportViolation
+		 * will behave properly.
+		 *
+		 * In addition to this, we have to pass the correct tupdesc to
+		 * ri_ReportViolation, overriding its normal habit of using the pk_rel
+		 * or fk_rel's tupdesc.
+		 */
+		memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
+		for (i = 0; i < fake_riinfo.nkeys; i++)
+			fake_riinfo.pk_attnums[i] = i + 1;
+
+		ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
+						   slot, tupdesc, 0, true);
+	}
+
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+}
+
 
 /* ----------
  * Local functions below
@@ -1730,6 +1931,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
 	/* Find or create a hashtable entry for the constraint */
 	riinfo = ri_LoadConstraintInfo(constraintOid);
 
+#if 0
 	/* Do some easy cross-checks against the trigger call data */
 	if (rel_is_pk)
 	{
@@ -1738,6 +1940,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
 			elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
 				 trigger->tgname, RelationGetRelationName(trig_rel));
 	}
+#endif
 
 	if (riinfo->confmatchtype != FKCONSTR_MATCH_FULL &&
 		riinfo->confmatchtype != FKCONSTR_MATCH_PARTIAL &&
@@ -2054,7 +2257,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 						   pk_rel, fk_rel,
 						   newslot ? newslot : oldslot,
 						   NULL,
-						   qkey->constr_queryno);
+						   qkey->constr_queryno, false);
 
 	return SPI_processed != 0;
 }
@@ -2095,7 +2298,7 @@ static void
 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 				   Relation pk_rel, Relation fk_rel,
 				   TupleTableSlot *violatorslot, TupleDesc tupdesc,
-				   int queryno)
+				   int queryno, bool partgone)
 {
 	StringInfoData key_names;
 	StringInfoData key_values;
@@ -2134,9 +2337,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 	 *
 	 * Check table-level permissions next and, failing that, column-level
 	 * privileges.
+	 *
+	 * When a partition at the referenced side is being detached/dropped, we
+	 * needn't check, since the user must be the table owner anyway.
 	 */
-
-	if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
+	if (partgone)
+		has_perm = true;
+	else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
 	{
 		aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
 		if (aclresult != ACLCHECK_OK)
@@ -2198,7 +2405,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 		}
 	}
 
-	if (onfk)
+	if (partgone)
+		ereport(ERROR,
+				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+				 errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
+						RelationGetRelationName(pk_rel),
+						NameStr(riinfo->conname)),
+				 errdetail("Key (%s)=(%s) still referenced from table \"%s\".",
+						   key_names.data, key_values.data,
+						   RelationGetRelationName(fk_rel))));
+	else if (onfk)
 		ereport(ERROR,
 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
 				 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 85055bbb95a..63ec10be500 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1818,6 +1818,24 @@ pg_get_partition_constraintdef(PG_FUNCTION_ARGS)
 	PG_RETURN_TEXT_P(string_to_text(consrc));
 }
 
+/*
+ * pg_get_partconstrdef_string
+ *
+ * Returns the partition constraint as a C-string for the input relation, with
+ * the given alias.  No pretty-printing.
+ */
+char *
+pg_get_partconstrdef_string(Oid partitionId, char *aliasname)
+{
+	Expr	   *constr_expr;
+	List	   *context;
+
+	constr_expr = get_partition_qual_relid(partitionId);
+	context = deparse_context_for(aliasname, partitionId);
+
+	return deparse_expression((Node *) constr_expr, context, true, false);
+}
+
 /*
  * pg_get_constraintdef
  *
diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h
index 85076d07437..87cffd967c2 100644
--- a/src/include/catalog/heap.h
+++ b/src/include/catalog/heap.h
@@ -83,6 +83,8 @@ extern void heap_create_init_fork(Relation rel);
 
 extern void heap_drop_with_catalog(Oid relid);
 
+extern void pre_drop_class_check(Oid relationId, Oid objectSubId);
+
 extern void heap_truncate(List *relids);
 
 extern void heap_truncate_one_rel(Relation rel);
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index ec3bb90b01b..efbb5c3a93b 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -52,6 +52,10 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
 
 extern void CheckTableNotInUse(Relation rel, const char *stmt);
 
+extern List *GetParentedForeignKeyRefs(Relation partition);
+extern void CheckNoForeignKeyRefs(Relation partition, List *constraints,
+					  bool isDrop);
+
 extern void ExecuteTruncate(TruncateStmt *stmt);
 extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
 					DropBehavior behavior, bool restart_seqs);
@@ -76,10 +80,6 @@ extern void find_composite_type_dependencies(Oid typeOid,
 
 extern void check_of_type(HeapTuple typetuple);
 
-extern void createForeignKeyTriggers(Relation rel, Oid refRelOid,
-						 Constraint *fkconstraint, Oid constraintOid,
-						 Oid indexOid, bool create_action);
-
 extern void register_on_commit_action(Oid relid, OnCommitAction action);
 extern void remove_on_commit_action(Oid relid);
 
diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h
index 846679ecc12..8e3b2e7a688 100644
--- a/src/include/commands/trigger.h
+++ b/src/include/commands/trigger.h
@@ -263,6 +263,7 @@ extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
 							  TupleTableSlot *old_slot, TupleTableSlot  *new_slot);
 extern bool RI_Initial_Check(Trigger *trigger,
 				 Relation fk_rel, Relation pk_rel);
+extern void RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel);
 
 /* result values for RI_FKey_trigger_type: */
 #define RI_TRIGGER_PK	1		/* is a trigger on the PK relation */
diff --git a/src/include/utils/ruleutils.h b/src/include/utils/ruleutils.h
index 3ebc01e7147..7c49e9d0a83 100644
--- a/src/include/utils/ruleutils.h
+++ b/src/include/utils/ruleutils.h
@@ -22,6 +22,7 @@ extern char *pg_get_indexdef_string(Oid indexrelid);
 extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty);
 
 extern char *pg_get_partkeydef_columns(Oid relid, bool pretty);
+extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname);
 
 extern char *pg_get_constraintdef_command(Oid constraintId);
 extern char *deparse_expression(Node *expr, List *dpcontext,
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index f1a664e3394..890eb360e0f 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1498,19 +1498,6 @@ drop table pktable2, fktable2;
 --
 -- Foreign keys and partitioned tables
 --
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
-  PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
-  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
-ERROR:  cannot reference partitioned table "fk_partitioned_pk"
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
-  REFERENCES fk_partitioned_pk;
-ERROR:  cannot reference partitioned table "fk_partitioned_pk"
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
 -- Creation of a partitioned hierarchy with irregular definitions
 CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
   PRIMARY KEY (a, b));
@@ -1640,7 +1627,7 @@ CREATE TABLE fk_partitioned_fk_full (x int, y int) PARTITION BY RANGE (x);
 CREATE TABLE fk_partitioned_fk_full_1 PARTITION OF fk_partitioned_fk_full DEFAULT;
 INSERT INTO fk_partitioned_fk_full VALUES (1, NULL);
 ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL;  -- fails
-ERROR:  insert or update on table "fk_partitioned_fk_full" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_full_1" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey"
 DETAIL:  MATCH FULL does not allow mixing of null and nonnull key values.
 TRUNCATE fk_partitioned_fk_full;
 ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL;
@@ -1856,7 +1843,7 @@ CREATE TABLE fk_partitioned_fk_2_2 PARTITION OF fk_partitioned_fk_2 FOR VALUES F
 INSERT INTO fk_partitioned_fk_2 VALUES (1600, 601), (1600, 1601);
 ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
   FOR VALUES IN (1600);
-ERROR:  insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_b_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_2_1" violates foreign key constraint "fk_partitioned_fk_a_b_fkey"
 DETAIL:  Key (a, b)=(1600, 601) is not present in table "fk_notpartitioned_pk".
 INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601);
 ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
@@ -1985,3 +1972,152 @@ ERROR:  constraint "my_fkey" of relation "fk_part_1_1" does not exist
 drop schema fkpart0, fkpart1, fkpart2 cascade;
 NOTICE:  drop cascades to 8 other objects
 \set VERBOSITY default
+-- Test a partitioned table as referenced table.
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions
+-- added (created and attached) after creating the foreign key.
+create schema regress_fk;
+set search_path to regress_fk;
+create table pk (a int primary key) partition by range (a);
+create table pk1 partition of pk for values from (0) to (1000);
+create table pk2 (b int, a int);
+alter table pk2 drop column b;
+alter table pk2 alter a set not null;
+alter table pk attach partition pk2 for values from (1000) to (2000);
+create table fk (a int) partition by range (a);
+create table fk1 partition of fk for values from (0) to (750);
+alter table fk add foreign key (a) references pk;
+create table fk2 (b int, a int) ;
+alter table fk2 drop column b;
+alter table fk attach partition fk2 for values from (750) to (3500);
+create table pk3 partition of pk for values from (2000) to (3000);
+create table pk4 (like pk);
+alter table pk attach partition pk4 for values from (3000) to (4000);
+create table pk5 (like pk) partition by range (a);
+create table pk51 partition of pk5 for values from (4000) to (4500);
+create table pk52 partition of pk5 for values from (4500) to (5000);
+alter table pk attach partition pk5 for values from (4000) to (5000);
+create table fk3 partition of fk for values from (3500) to (5000);
+-- these should fail: referenced value not present
+insert into fk values (1);
+ERROR:  insert or update on table "fk1" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(1) is not present in table "pk".
+insert into fk values (1000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(1000) is not present in table "pk".
+insert into fk values (2000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(2000) is not present in table "pk".
+insert into fk values (3000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(3000) is not present in table "pk".
+insert into fk values (4000);
+ERROR:  insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(4000) is not present in table "pk".
+insert into fk values (4500);
+ERROR:  insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(4500) is not present in table "pk".
+-- insert into the referenced table, now they should work
+insert into pk values (1), (1000), (2000), (3000), (4000), (4500);
+insert into fk values (1), (1000), (2000), (3000), (4000), (4500);
+-- should fail: referencing value present
+delete from pk where a = 1;
+ERROR:  update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL:  Key (a)=(1) is still referenced from table "fk".
+delete from pk where a = 1000;
+ERROR:  update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(1000) is still referenced from table "fk".
+delete from pk where a = 2000;
+ERROR:  update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL:  Key (a)=(2000) is still referenced from table "fk".
+delete from pk where a = 3000;
+ERROR:  update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL:  Key (a)=(3000) is still referenced from table "fk".
+delete from pk where a = 4000;
+ERROR:  update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL:  Key (a)=(4000) is still referenced from table "fk".
+delete from pk where a = 4500;
+ERROR:  update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL:  Key (a)=(4500) is still referenced from table "fk".
+update pk set a = 2 where a = 1;
+ERROR:  update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL:  Key (a)=(1) is still referenced from table "fk".
+update pk set a = 1002 where a = 1000;
+ERROR:  update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(1000) is still referenced from table "fk".
+update pk set a = 2002 where a = 2000;
+ERROR:  update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL:  Key (a)=(2000) is still referenced from table "fk".
+update pk set a = 3002 where a = 3000;
+ERROR:  update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL:  Key (a)=(3000) is still referenced from table "fk".
+update pk set a = 4002 where a = 4000;
+ERROR:  update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL:  Key (a)=(4000) is still referenced from table "fk".
+update pk set a = 4502 where a = 4500;
+ERROR:  update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL:  Key (a)=(4500) is still referenced from table "fk".
+-- now they should work
+delete from fk;
+update pk set a = 2 where a = 1;
+delete from pk where a = 2;
+update pk set a = 1002 where a = 1000;
+delete from pk where a = 1002;
+update pk set a = 2002 where a = 2000;
+delete from pk where a = 2002;
+update pk set a = 3002 where a = 3000;
+delete from pk where a = 3002;
+update pk set a = 4002 where a = 4000;
+delete from pk where a = 4002;
+update pk set a = 4502 where a = 4500;
+delete from pk where a = 4502;
+-- dropping/detaching partitions is prevented if that would break
+-- a foreign key's existing data
+create table droppk (a int primary key) partition by range (a);
+create table droppk1 partition of droppk for values from (0) to (1000);
+create table droppk_d partition of droppk default;
+create table droppk2 partition of droppk for values from (1000) to (2000)
+  partition by range (a);
+create table droppk21 partition of droppk2 for values from (1000) to (1400);
+create table droppk2_d partition of droppk2 default;
+insert into droppk values (1), (1000), (1500), (2000);
+create table dropfk (a int references droppk);
+insert into dropfk values (1), (1000), (1500), (2000);
+-- these should all fail
+alter table droppk detach partition droppk_d;
+ERROR:  removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5"
+DETAIL:  Key (a)=(2000) still referenced from table "dropfk".
+alter table droppk2 detach partition droppk2_d;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+alter table droppk detach partition droppk1;
+ERROR:  removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1"
+DETAIL:  Key (a)=(1) still referenced from table "dropfk".
+alter table droppk detach partition droppk2;
+ERROR:  removing partition "droppk2" violates foreign key constraint "dropfk_a_fkey2"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+alter table droppk2 detach partition droppk21;
+ERROR:  removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+drop table droppk_d;
+ERROR:  removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5"
+DETAIL:  Key (a)=(2000) still referenced from table "dropfk".
+drop table droppk2_d;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+drop table droppk1;
+ERROR:  removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1"
+DETAIL:  Key (a)=(1) still referenced from table "dropfk".
+drop table droppk2;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+drop table droppk21;
+ERROR:  removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+delete from dropfk;
+-- now they should all work
+drop table droppk_d;
+drop table droppk2_d;
+drop table droppk1;
+alter table droppk2 detach partition droppk21;
+drop table droppk2;
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 4639fb45093..f9349e02505 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1125,18 +1125,6 @@ drop table pktable2, fktable2;
 -- Foreign keys and partitioned tables
 --
 
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
-  PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
-  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
-  REFERENCES fk_partitioned_pk;
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-
 -- Creation of a partitioned hierarchy with irregular definitions
 CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
   PRIMARY KEY (a, b));
@@ -1423,3 +1411,105 @@ alter table fkpart2.fk_part_1_1 drop constraint my_fkey;	-- doesn't exist
 \set VERBOSITY terse	\\ -- suppress cascade details
 drop schema fkpart0, fkpart1, fkpart2 cascade;
 \set VERBOSITY default
+
+-- Test a partitioned table as referenced table.
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions
+-- added (created and attached) after creating the foreign key.
+create schema regress_fk;
+set search_path to regress_fk;
+
+create table pk (a int primary key) partition by range (a);
+create table pk1 partition of pk for values from (0) to (1000);
+create table pk2 (b int, a int);
+alter table pk2 drop column b;
+alter table pk2 alter a set not null;
+alter table pk attach partition pk2 for values from (1000) to (2000);
+
+create table fk (a int) partition by range (a);
+create table fk1 partition of fk for values from (0) to (750);
+alter table fk add foreign key (a) references pk;
+create table fk2 (b int, a int) ;
+alter table fk2 drop column b;
+alter table fk attach partition fk2 for values from (750) to (3500);
+
+create table pk3 partition of pk for values from (2000) to (3000);
+create table pk4 (like pk);
+alter table pk attach partition pk4 for values from (3000) to (4000);
+
+create table pk5 (like pk) partition by range (a);
+create table pk51 partition of pk5 for values from (4000) to (4500);
+create table pk52 partition of pk5 for values from (4500) to (5000);
+alter table pk attach partition pk5 for values from (4000) to (5000);
+
+create table fk3 partition of fk for values from (3500) to (5000);
+
+-- these should fail: referenced value not present
+insert into fk values (1);
+insert into fk values (1000);
+insert into fk values (2000);
+insert into fk values (3000);
+insert into fk values (4000);
+insert into fk values (4500);
+-- insert into the referenced table, now they should work
+insert into pk values (1), (1000), (2000), (3000), (4000), (4500);
+insert into fk values (1), (1000), (2000), (3000), (4000), (4500);
+
+-- should fail: referencing value present
+delete from pk where a = 1;
+delete from pk where a = 1000;
+delete from pk where a = 2000;
+delete from pk where a = 3000;
+delete from pk where a = 4000;
+delete from pk where a = 4500;
+update pk set a = 2 where a = 1;
+update pk set a = 1002 where a = 1000;
+update pk set a = 2002 where a = 2000;
+update pk set a = 3002 where a = 3000;
+update pk set a = 4002 where a = 4000;
+update pk set a = 4502 where a = 4500;
+-- now they should work
+delete from fk;
+update pk set a = 2 where a = 1;
+delete from pk where a = 2;
+update pk set a = 1002 where a = 1000;
+delete from pk where a = 1002;
+update pk set a = 2002 where a = 2000;
+delete from pk where a = 2002;
+update pk set a = 3002 where a = 3000;
+delete from pk where a = 3002;
+update pk set a = 4002 where a = 4000;
+delete from pk where a = 4002;
+update pk set a = 4502 where a = 4500;
+delete from pk where a = 4502;
+
+-- dropping/detaching partitions is prevented if that would break
+-- a foreign key's existing data
+create table droppk (a int primary key) partition by range (a);
+create table droppk1 partition of droppk for values from (0) to (1000);
+create table droppk_d partition of droppk default;
+create table droppk2 partition of droppk for values from (1000) to (2000)
+  partition by range (a);
+create table droppk21 partition of droppk2 for values from (1000) to (1400);
+create table droppk2_d partition of droppk2 default;
+insert into droppk values (1), (1000), (1500), (2000);
+create table dropfk (a int references droppk);
+insert into dropfk values (1), (1000), (1500), (2000);
+-- these should all fail
+alter table droppk detach partition droppk_d;
+alter table droppk2 detach partition droppk2_d;
+alter table droppk detach partition droppk1;
+alter table droppk detach partition droppk2;
+alter table droppk2 detach partition droppk21;
+drop table droppk_d;
+drop table droppk2_d;
+drop table droppk1;
+drop table droppk2;
+drop table droppk21;
+delete from dropfk;
+-- now they should all work
+drop table droppk_d;
+drop table droppk2_d;
+drop table droppk1;
+alter table droppk2 detach partition droppk21;
+drop table droppk2;
-- 
2.17.1

Reply via email to