On 2019-Mar-26, Alvaro Herrera wrote: > Thanks for the thorough testing and bug analysis! It was spot-on. I've > applied your two proposed fixes, as well as added a new test setup that > covers both these bugs. The attached set is rebased on 7c366ac969ce.
Attached is rebased on 126d63122232. No further changes. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From fbf2d42a3caac983395579d89a07d5ccdfab5973 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 28 Nov 2018 11:52:00 -0300 Subject: [PATCH v9 1/2] Rework deleteObjectsInList to allow objtype-specific checks This doesn't change any functionality yet. --- src/backend/catalog/dependency.c | 41 +++++++++++++++++++------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index f7acb4103eb..09876705b08 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -230,29 +230,38 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel, int i; /* - * Keep track of objects for event triggers, if necessary. + * Invoke pre-deletion callbacks and keep track of objects for event + * triggers, if necessary. */ - if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL)) + for (i = 0; i < targetObjects->numrefs; i++) { - for (i = 0; i < targetObjects->numrefs; i++) + const ObjectAddress *thisobj = &targetObjects->refs[i]; + ObjectClass objectClass = getObjectClass(thisobj); + + if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL)) { - const ObjectAddress *thisobj = &targetObjects->refs[i]; - const ObjectAddressExtra *extra = &targetObjects->extras[i]; - bool original = false; - bool normal = false; - - if (extra->flags & DEPFLAG_ORIGINAL) - original = true; - if (extra->flags & DEPFLAG_NORMAL) - normal = true; - if (extra->flags & DEPFLAG_REVERSE) - normal = true; - - if (EventTriggerSupportsObjectClass(getObjectClass(thisobj))) + if (EventTriggerSupportsObjectClass(objectClass)) { + bool original = false; + bool normal = false; + const ObjectAddressExtra *extra = &targetObjects->extras[i]; + + if (extra->flags & DEPFLAG_ORIGINAL) + original = true; + if (extra->flags & DEPFLAG_NORMAL || + extra->flags & DEPFLAG_REVERSE) + normal = true; + EventTriggerSQLDropAddObject(thisobj, original, normal); } } + + /* Invoke class-specific pre-deletion checks */ + switch (objectClass) + { + default: + break; + } } /* -- 2.17.1
>From 1addb0b6dfe973b4319c01cdf2537b2aff805ae6 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 20 Feb 2019 15:08:20 -0300 Subject: [PATCH v9 2/2] support FKs referencing partitioned tables --- doc/src/sgml/ref/create_table.sgml | 7 +- src/backend/catalog/dependency.c | 3 + src/backend/catalog/heap.c | 24 + src/backend/commands/tablecmds.c | 1367 +++++++++++++++------ src/backend/utils/adt/ri_triggers.c | 244 +++- src/backend/utils/adt/ruleutils.c | 18 + src/include/catalog/heap.h | 2 + src/include/commands/tablecmds.h | 8 +- src/include/commands/trigger.h | 1 + src/include/utils/ruleutils.h | 1 + src/test/regress/expected/foreign_key.out | 211 +++- src/test/regress/sql/foreign_key.sql | 144 ++- 12 files changed, 1595 insertions(+), 435 deletions(-) diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml index e94fe2c3b67..37ae0f00fda 100644 --- a/doc/src/sgml/ref/create_table.sgml +++ b/doc/src/sgml/ref/create_table.sgml @@ -378,9 +378,6 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM <para> Partitioned tables do not support <literal>EXCLUDE</literal> constraints; however, you can define these constraints on individual partitions. - Also, while it's possible to define <literal>PRIMARY KEY</literal> - constraints on partitioned tables, creating foreign keys that - reference a partitioned table is not yet supported. </para> <para> @@ -995,9 +992,7 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM addition of a foreign key constraint requires a <literal>SHARE ROW EXCLUSIVE</literal> lock on the referenced table. Note that foreign key constraints cannot be defined between temporary - tables and permanent tables. Also note that while it is possible to - define a foreign key on a partitioned table, it is not possible to - declare a foreign key that references a partitioned table. + tables and permanent tables. </para> <para> diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 09876705b08..83965bb048c 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -259,6 +259,9 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel, /* Invoke class-specific pre-deletion checks */ switch (objectClass) { + case OCLASS_CLASS: + pre_drop_class_check(thisobj->objectId, thisobj->objectSubId); + break; default: break; } diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index c7b5ff62f9f..6680f755dd5 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -1825,6 +1825,30 @@ RemoveAttrDefaultById(Oid attrdefId) relation_close(myrel, NoLock); } +/* + * Checks to be run before just dropping a relation. + */ +void +pre_drop_class_check(Oid relationId, Oid objectSubId) +{ + Relation relation; + + /* caller must hold strong lock already, if they're dropping */ + relation = relation_open(relationId, NoLock); + + /* + * For leaf partitions, this is our last chance to verify any foreign keys + * that may point to the partition as referenced table. + */ + if (relation->rd_rel->relkind == RELKIND_RELATION && + relation->rd_rel->relispartition) + CheckNoForeignKeyRefs(relation, + GetParentedForeignKeyRefs(relation), + true); + + relation_close(relation, NoLock); +} + /* * heap_drop_with_catalog - removes specified relation from catalogs * diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 048c1196685..3c6470bc671 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -414,10 +414,33 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo * Relation rel, Constraint *fkconstraint, Oid parentConstr, bool recurse, bool recursing, LOCKMODE lockmode); -static void CloneForeignKeyConstraints(Oid parentId, Oid relationId, - List **cloned); -static void CloneFkReferencing(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned); +static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid indexOid, Oid parentConstraint, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok); +static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr, int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok, + LOCKMODE lockmode); +static void CloneForeignKeyConstraints(List **wqueue, Oid parentId, + Oid relationId); +static void CloneFkReferenced(Relation parentRel, Relation partitionRel, + Relation pg_constraint); +static void CloneFkReferencing(List **wqueue, Relation parentRel, + Relation partRel, List *clone); +static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); +static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); +static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, + Oid partRelid, + Oid parentConstrOid, int numfks, + AttrNumber *mapped_conkey, AttrNumber *confkey, + Oid *conpfeqop); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -1058,7 +1081,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, * And foreign keys too. Note that because we're freshly creating the * table, there is no need to verify these new constraints. */ - CloneForeignKeyConstraints(parentId, relationId, NULL); + CloneForeignKeyConstraints(NULL, parentId, relationId); table_close(parent, NoLock); } @@ -3514,7 +3537,8 @@ AlterTableGetLockLevel(List *cmds) /* * Removing constraints can affect SELECTs that have been - * optimised assuming the constraint holds true. + * optimised assuming the constraint holds true. See also + * CloneFkReferenced. */ case AT_DropConstraint: /* as DROP INDEX */ case AT_DropNotNull: /* may change some SQL plans */ @@ -7173,9 +7197,6 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, case CONSTR_FOREIGN: /* - * Note that we currently never recurse for FK constraints, so the - * "recurse" flag is silently ignored. - * * Assign or validate constraint name */ if (newConstraint->conname) @@ -7393,6 +7414,13 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Subroutine for ATExecAddConstraint. Must already hold exclusive * lock on the rel, and have done appropriate validity checks for it. * We do permissions checks here, however. + * + * When the referenced or referencing tables (or both) are partitioned, + * multiple pg_constraint rows are required -- one for each partitioned table + * and each partition on each side (fortunately, not one for every combination + * thereof). We also need action triggers on each leaf partition on the + * referenced side, and check triggers on each leaf partition on the + * referencing side. */ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, @@ -7408,12 +7436,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Oid pfeqoperators[INDEX_MAX_KEYS]; Oid ppeqoperators[INDEX_MAX_KEYS]; Oid ffeqoperators[INDEX_MAX_KEYS]; - bool connoinherit; int i; int numfks, numpks; Oid indexOid; - Oid constrOid; bool old_check_ok; ObjectAddress address; ListCell *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop); @@ -7431,12 +7457,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Validity checks (permission checks wait till we have the column * numbers) */ - if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot reference partitioned table \"%s\"", - RelationGetRelationName(pkrel)))); - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { if (!recurse) @@ -7454,7 +7474,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, errdetail("This feature is not yet supported on partitioned tables."))); } - if (pkrel->rd_rel->relkind != RELKIND_RELATION) + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("referenced relation \"%s\" is not a table", @@ -7664,8 +7685,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("foreign key constraint \"%s\" " - "cannot be implemented", + errmsg("foreign key constraint \"%s\" cannot be implemented", fkconstraint->conname), errdetail("Key columns \"%s\" and \"%s\" " "are of incompatible types: %s and %s.", @@ -7753,21 +7773,132 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, } /* - * FKs always inherit for partitioned tables, and never for legacy - * inheritance. + * Create all the constraint and trigger objects, recursing to partitions + * as necessary. First handle the referenced side. */ - connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE; + address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel, + indexOid, + InvalidOid, /* no parent constraint */ + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok); + + /* Now handle the referencing side. */ + addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel, + indexOid, + address.objectId, + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok, + lockmode); + + /* + * Done. Close pk table, but keep lock until we've committed. + */ + table_close(pkrel, NoLock); + + return address; +} + +/* + * addFkRecurseReferenced + * subroutine for ATAddForeignKeyConstraint; recurses on the referenced + * side of the constraint + * + * Create pg_constraint rows for the referenced side of the constraint, + * referencing the parent of the referencing side; also create action triggers + * on leaf partitions. If the table is partitioned, recurse to handle each + * partition. + * + * wqueue is the ALTER TABLE work queue; can be NULL when not running as part + * of an ALTER TABLE sequence. + * fkconstraint is the constraint being added. + * rel is the root referencing relation. + * pkrel is the referenced relation; might be a partition, if recursing. + * indexOid is the OID of the index (on pkrel) implementing this constraint. + * parentConstraint is the OID of a parent constraint; InvalidOid if this is a + * top-level constraint. + * numfks is the number of columns in the foreign key + * pkattnum is the attnum array of referenced attributes. + * fkattnum is the attnum array of referencing attributes. + * pf/pp/ffeqoperators are OID array of operators between columns. + * old_check_ok signals that this constraint replaces an existing one that + * was already validated (thus this one doesn't need validation). + */ +static ObjectAddress +addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid indexOid, Oid parentConstraint, + int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok) +{ + ObjectAddress address; + Oid constrOid; + char *conname; + bool conislocal; + int coninhcount; + bool connoinherit; + + /* + * Verify relkind for each referenced partition. At the top level, this + * is redundant with a previous check, but we need it when recursing. + */ + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("referenced relation \"%s\" is not a table", + RelationGetRelationName(pkrel)))); + + /* + * Caller supplies us with a constraint name; however, it may be used in + * this partition, so come up with a different one in that case. + */ + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(rel), + fkconstraint->conname)) + conname = ChooseConstraintName(RelationGetRelationName(rel), + ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs), + "fkey", + RelationGetNamespace(rel), NIL); + else + conname = fkconstraint->conname; + + if (OidIsValid(parentConstraint)) + { + conislocal = false; + coninhcount = 1; + connoinherit = false; + } + else + { + conislocal = true; + coninhcount = 0; + + /* + * always inherit for partitioned tables, never for legacy inheritance + */ + connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE; + } /* * Record the FK constraint in pg_constraint. */ - constrOid = CreateConstraintEntry(fkconstraint->conname, + constrOid = CreateConstraintEntry(conname, RelationGetNamespace(rel), CONSTRAINT_FOREIGN, fkconstraint->deferrable, fkconstraint->initdeferred, fkconstraint->initially_valid, - parentConstr, + parentConstraint, RelationGetRelid(rel), fkattnum, numfks, @@ -7779,108 +7910,312 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, pfeqoperators, ppeqoperators, ffeqoperators, - numpks, + numfks, fkconstraint->fk_upd_action, fkconstraint->fk_del_action, fkconstraint->fk_matchtype, NULL, /* no exclusion constraint */ NULL, /* no check constraint */ NULL, - true, /* islocal */ - 0, /* inhcount */ - connoinherit, /* conNoInherit */ + conislocal, /* islocal */ + coninhcount, /* inhcount */ + connoinherit, /* conNoInherit */ false); /* is_internal */ + ObjectAddressSet(address, ConstraintRelationId, constrOid); /* - * Create the triggers that will enforce the constraint. We only want the - * action triggers to appear for the parent partitioned relation, even - * though the constraints also exist below. + * Also, if this is a constraint on a partition, give it partition-type + * dependencies on the parent constraint as well as the table. */ - createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, - constrOid, indexOid, !recursing); + if (OidIsValid(parentConstraint)) + { + ObjectAddress referenced; + + ObjectAddressSet(referenced, ConstraintRelationId, parentConstraint); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI); + ObjectAddressSet(referenced, RelationRelationId, RelationGetRelid(pkrel)); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC); + } + + /* make new constraint visible, in case we add more */ + CommandCounterIncrement(); /* - * Tell Phase 3 to check that the constraint is satisfied by existing - * rows. We can skip this during table creation, when requested explicitly - * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're - * recreating a constraint following a SET DATA TYPE operation that did - * not impugn its validity. + * If the referenced table is a plain relation, create the action triggers + * that enforce the constraint. */ - if (!old_check_ok && !fkconstraint->skip_validation) + if (pkrel->rd_rel->relkind == RELKIND_RELATION) { - NewConstraint *newcon; - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = fkconstraint->conname; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = RelationGetRelid(pkrel); - newcon->refindid = indexOid; - newcon->conid = constrOid; - newcon->qual = (Node *) fkconstraint; - - tab->constraints = lappend(tab->constraints, newcon); + createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel), + fkconstraint, + constrOid, indexOid); } /* - * When called on a partitioned table, recurse to create the constraint on - * the partitions also. + * If the referenced table is partitioned, recurse on ourselves to handle + * each partition. We need one pg_constraint row created for each + * partition in addition to the pg_constraint row for the parent table. */ - if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { - PartitionDesc partdesc; - Relation pg_constraint; - List *cloned = NIL; - ListCell *cell; + PartitionDesc pd = RelationGetPartitionDesc(pkrel); - pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock); - partdesc = RelationGetPartitionDesc(rel); - - for (i = 0; i < partdesc->nparts; i++) + for (int i = 0; i < pd->nparts; i++) { - Oid partitionId = partdesc->oids[i]; + Relation partRel; + AttrNumber *map; + AttrNumber *mapped_pkattnum; + Oid partIndexId; + + partRel = table_open(pd->oids[i], ShareRowExclusiveLock); + + /* + * Map the attribute numbers in the referenced side of the FK + * definition to match the partition's column layout. + */ + map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel), + RelationGetDescr(pkrel), + gettext_noop("could not convert row type")); + if (map) + { + mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks); + for (int j = 0; j < numfks; j++) + mapped_pkattnum[j] = map[pkattnum[j] - 1]; + } + else + mapped_pkattnum = pkattnum; + + /* do the deed */ + partIndexId = index_get_partition(partRel, indexOid); + if (!OidIsValid(partIndexId)) + elog(ERROR, "index for %u not found in partition %s", + indexOid, RelationGetRelationName(partRel)); + addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel, + partIndexId, constrOid, numfks, + mapped_pkattnum, fkattnum, + pfeqoperators, ppeqoperators, ffeqoperators, + old_check_ok); + + /* Done -- clean up (but keep the lock) */ + table_close(partRel, NoLock); + if (map) + { + pfree(mapped_pkattnum); + pfree(map); + } + } + } + + return address; +} + +/* + * addFkRecurseReferencing + * subroutine for ATAddForeignKeyConstraint and CloneFkReferencing + * + * If the referencing relation is a plain relation, create the necessary check + * triggers that implement the constraint, and set up for Phase 3 constraint + * verification. If the referencing relation is a partitioned table, then + * we create a pg_constraint row for it and recurse on this routine for each + * partition. + * + * wqueue is the ALTER TABLE work queue; can be NULL when not running as part + * of an ALTER TABLE sequence. + * fkconstraint is the constraint being added. + * rel is the referencing relation; might be a partition, if recursing. + * pkrel is the root referenced relation. + * indexOid is the OID of the index (on pkrel) implementing this constraint. + * parentConstr is the OID of the parent constraint (there is always one). + * numfks is the number of columns in the foreign key + * pkattnum is the attnum array of referenced attributes. + * fkattnum is the attnum array of referencing attributes. + * pf/pp/ffeqoperators are OID array of operators between columns. + * old_check_ok signals that this constraint replaces an existing one that + * was already validated (thus this one doesn't need validation). + * lockmode is the lockmode to acquire on partitions when recursing. + */ +static void +addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid indexOid, Oid parentConstr, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok, LOCKMODE lockmode) +{ + AssertArg(OidIsValid(parentConstr)); + + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("foreign keys constraints are not supported on foreign tables"))); + + /* + * If the referencing relation is a plain table, add the check triggers to + * it and, if necessary, schedule it to be checked in Phase 3. + * + * If the relation is partitioned, drill down to do it to its partitions. + */ + if (rel->rd_rel->relkind == RELKIND_RELATION) + { + createForeignKeyCheckTriggers(RelationGetRelid(rel), + RelationGetRelid(pkrel), + fkconstraint, + parentConstr, + indexOid); + + /* + * Tell Phase 3 to check that the constraint is satisfied by existing + * rows. We can skip this during table creation, when requested + * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command, + * and when we're recreating a constraint following a SET DATA TYPE + * operation that did not impugn its validity. + */ + if (wqueue && !old_check_ok && !fkconstraint->skip_validation) + { + NewConstraint *newcon; + AlteredTableInfo *tab; + + tab = ATGetQueueEntry(wqueue, rel); + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = get_constraint_name(parentConstr); + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = RelationGetRelid(pkrel); + newcon->refindid = indexOid; + newcon->conid = parentConstr; + newcon->qual = (Node *) fkconstraint; + + tab->constraints = lappend(tab->constraints, newcon); + } + } + else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc pd = RelationGetPartitionDesc(rel); + + /* + * Recurse to take appropriate action on each partition; either we + * find an existing constraint to reparent to ours, or we create a new + * one. + */ + for (int i = 0; i < pd->nparts; i++) + { + Oid partitionId = pd->oids[i]; Relation partition = table_open(partitionId, lockmode); + List *partFKs; + AttrNumber *attmap; + AttrNumber mapped_fkattnum[INDEX_MAX_KEYS]; + bool attached; + char *conname; + Oid constrOid; + ObjectAddress address, + referenced; + ListCell *cell; CheckTableNotInUse(partition, "ALTER TABLE"); - CloneFkReferencing(pg_constraint, rel, partition, - list_make1_oid(constrOid), - &cloned); + attmap = convert_tuples_by_name_map(RelationGetDescr(partition), + RelationGetDescr(rel), + gettext_noop("could not convert row type")); + for (int j = 0; j < numfks; j++) + mapped_fkattnum[j] = attmap[fkattnum[j] - 1]; + + partFKs = copyObject(RelationGetFKeyList(partition)); + attached = false; + foreach(cell, partFKs) + { + ForeignKeyCacheInfo *fk; + + fk = lfirst_node(ForeignKeyCacheInfo, cell); + if (tryAttachPartitionForeignKey(fk, + partitionId, + parentConstr, + numfks, + mapped_fkattnum, + pkattnum, + pfeqoperators)) + { + attached = true; + break; + } + } + if (attached) + { + table_close(partition, NoLock); + continue; + } + + /* + * No luck finding a good constraint to reuse; create our own. + */ + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(partition), + fkconstraint->conname)) + conname = ChooseConstraintName(RelationGetRelationName(partition), + ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs), + "fkey", + RelationGetNamespace(partition), NIL); + else + conname = fkconstraint->conname; + constrOid = + CreateConstraintEntry(conname, + RelationGetNamespace(partition), + CONSTRAINT_FOREIGN, + fkconstraint->deferrable, + fkconstraint->initdeferred, + fkconstraint->initially_valid, + parentConstr, + partitionId, + mapped_fkattnum, + numfks, + numfks, + InvalidOid, + indexOid, + RelationGetRelid(pkrel), + pkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + numfks, + fkconstraint->fk_upd_action, + fkconstraint->fk_del_action, + fkconstraint->fk_matchtype, + NULL, + NULL, + NULL, + false, + 1, + false, + false); + + /* + * Give this constraint partition-type dependencies on the parent + * constraint as well as the table. + */ + ObjectAddressSet(address, ConstraintRelationId, constrOid); + ObjectAddressSet(referenced, ConstraintRelationId, parentConstr); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI); + ObjectAddressSet(referenced, RelationRelationId, partitionId); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC); + + /* Make all this visible before recursing */ + CommandCounterIncrement(); + + /* call ourselves to finalize the creation and we're done */ + addFkRecurseReferencing(wqueue, fkconstraint, partition, pkrel, + indexOid, + constrOid, + numfks, + pkattnum, + mapped_fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok, + lockmode); table_close(partition, NoLock); } - table_close(pg_constraint, RowExclusiveLock); - - foreach(cell, cloned) - { - ClonedConstraint *cc = (ClonedConstraint *) lfirst(cell); - Relation partition = table_open(cc->relid, lockmode); - AlteredTableInfo *childtab; - NewConstraint *newcon; - - /* Find or create work queue entry for this partition */ - childtab = ATGetQueueEntry(wqueue, partition); - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = cc->constraint->conname; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = cc->refrelid; - newcon->refindid = cc->conindid; - newcon->conid = cc->conid; - newcon->qual = (Node *) fkconstraint; - - childtab->constraints = lappend(childtab->constraints, newcon); - - table_close(partition, lockmode); - } } - - /* - * Close pk table, but keep lock until we've committed. - */ - table_close(pkrel, NoLock); - - return address; } /* @@ -7891,74 +8226,223 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * relationId is a partition of parentId, so we can be certain that it has the * same columns with the same datatypes. The columns may be in different * order, though. - * - * The *cloned list is appended ClonedConstraint elements describing what was - * created, for the purposes of validating the constraint in ALTER TABLE's - * Phase 3. */ static void -CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) +CloneForeignKeyConstraints(List **wqueue, Oid parentId, Oid relationId) { Relation pg_constraint; Relation parentRel; Relation rel; - ScanKeyData key; - SysScanDesc scan; - HeapTuple tuple; List *clone = NIL; + ListCell *cell; parentRel = table_open(parentId, NoLock); /* already got lock */ + /* This only works for declarative partitioning */ + Assert(parentRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + /* see ATAddForeignKeyConstraint about lock level */ rel = table_open(relationId, AccessExclusiveLock); pg_constraint = table_open(ConstraintRelationId, RowShareLock); - /* Obtain the list of constraints to clone or attach */ - ScanKeyInit(&key, - Anum_pg_constraint_conrelid, BTEqualStrategyNumber, - F_OIDEQ, ObjectIdGetDatum(parentId)); - scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &key); + /* + * Clone constraints for which the parent is on the referenced side. + */ + CloneFkReferenced(parentRel, rel, pg_constraint); + + /* done with pg_constraint */ + table_close(pg_constraint, RowShareLock); + + /* + * Now clone constraints where the parent is on the referencing side. We + * first obtain a list of such constraints. + */ + foreach(cell, RelationGetFKeyList(parentRel)) + { + ForeignKeyCacheInfo *fk = lfirst(cell); + + clone = lappend_oid(clone, fk->conoid); + } + CloneFkReferencing(wqueue, parentRel, rel, clone); + list_free(clone); + + /* We're done. Clean up, keeping locks till commit */ + table_close(parentRel, NoLock); + table_close(rel, NoLock); +} + +/* + * CloneFkReferenced + * Subroutine for CloneForeignKeyConstraints + * + * Find all the FKs that have the parent relation on the referenced side; + * clone those constraints to the given partition. This is to be called + * when the partition is being created or attached. + * + * This recurses to partitions, if the relation being attached is partitioned. + * Recursion is done by calling addFkRecurseReferenced. + */ +static void +CloneFkReferenced(Relation parentRel, Relation partitionRel, + Relation pg_constraint) +{ + AttrNumber *attmap; + ListCell *cell; + SysScanDesc scan; + ScanKeyData key[2]; + HeapTuple tuple; + List *clone = NIL; + + /* + * Search for any constraints where this partition is in the referenced + * side. However, we must ignore any constraint whose parent constraint + * is also going to be cloned, to avoid duplicates. So do it in two + * steps: first construct the list of constraints to clone, then go over + * that list cloning those whose parents are not in the list. (We must + * not rely on the parent being seen first, since catalog order could + * return children first.) + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + ScanKeyInit(&key[0], + Anum_pg_constraint_confrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel))); + ScanKeyInit(&key[1], + Anum_pg_constraint_contype, BTEqualStrategyNumber, + F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN)); + /* This is a seqscan, as we don't have a usable index ... */ + scan = systable_beginscan(pg_constraint, InvalidOid, true, + NULL, 2, key); while ((tuple = systable_getnext(scan)) != NULL) { - Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid; + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); - clone = lappend_oid(clone, oid); + /* Only try to clone the top-level constraint; skip child ones. */ + if (constrForm->conparentid != InvalidOid) + continue; + + clone = lappend_oid(clone, constrForm->oid); } systable_endscan(scan); - /* Do the actual work, recursing to partitions as needed */ - CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned); + foreach(cell, clone) + { + Oid constrOid = lfirst_oid(cell); + Form_pg_constraint constrForm; + Relation fkRel; + Oid indexOid; + Oid partIndexId; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_confkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + Constraint *fkconstraint; - /* We're done. Clean up */ - table_close(parentRel, NoLock); - table_close(rel, NoLock); /* keep lock till commit */ - table_close(pg_constraint, RowShareLock); + tuple = SearchSysCache1(CONSTROID, constrOid); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for constraint %u", constrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* + * Because we're only expanding the key space at the referenced side, + * we don't need to prevent any operation in the referencing table, so + * AccessShareLock suffices (assumes that dropping the constraint + * acquires AEL). + */ + fkRel = table_open(constrForm->conrelid, AccessShareLock); + + indexOid = constrForm->conindid; + DeconstructFkConstraintRow(tuple, + &numfks, + conkey, + confkey, + conpfeqop, + conppeqop, + conffeqop); + for (int i = 0; i < numfks; i++) + mapped_confkey[i] = attmap[confkey[i] - 1]; + + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = NameStr(constrForm->conname); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + fkconstraint->initially_valid = true; + fkconstraint->fk_matchtype = constrForm->confmatchtype; + + /* set up colnames that are used to generate the constraint name */ + for (int i = 0; i < numfks; i++) + { + Form_pg_attribute att; + + att = TupleDescAttr(RelationGetDescr(fkRel), + conkey[i] - 1); + fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs, + makeString(NameStr(att->attname))); + } + + /* + * Add the new foreign key constraint pointing to the new partition. + * Because this new partition appears in the referenced side of the + * constraint, we don't need to set up for Phase 3 check. + */ + partIndexId = index_get_partition(partitionRel, indexOid); + if (!OidIsValid(partIndexId)) + elog(ERROR, "index for %u not found in partition %s", + indexOid, RelationGetRelationName(partitionRel)); + addFkRecurseReferenced(NULL, + fkconstraint, + fkRel, + partitionRel, + partIndexId, + constrOid, + numfks, + mapped_confkey, + conkey, + conpfeqop, + conppeqop, + conffeqop, + true); + + table_close(fkRel, NoLock); + ReleaseSysCache(tuple); + } } /* * CloneFkReferencing - * Recursive subroutine for CloneForeignKeyConstraints, referencing side + * Subroutine for CloneForeignKeyConstraints * - * Clone the given list of FK constraints when a partition is attached on the - * referencing side of those constraints. + * For each FK constraint of the parent relation in the given list, find an + * equivalent constraint in its partition relation that can be reparented; + * if one cannot be found, create a new constraint in the partition as its + * child. * - * When cloning foreign keys to a partition, it may happen that equivalent - * constraints already exist in the partition for some of them. We can skip - * creating a clone in that case, and instead just attach the existing - * constraint to the one in the parent. - * - * This function recurses to partitions, if the new partition is partitioned; - * of course, only do this for FKs that were actually cloned. + * If wqueue is given, it is used to set up phase-3 verification for each + * cloned constraint; if omitted, we assume that such verification is not + * needed (example: the partition is being created anew). */ static void -CloneFkReferencing(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned) +CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel, + List *clone) { AttrNumber *attmap; List *partFKs; - List *subclone = NIL; ListCell *cell; + if (clone == NIL) + return; + + if (partRel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("foreign keys constraints are not supported on foreign tables"))); + /* * The constraint key may differ, if the columns in the partition are * different. This map is used to convert them. @@ -7973,6 +8457,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, { Oid parentConstrOid = lfirst_oid(cell); Form_pg_constraint constrForm; + Relation pkrel; HeapTuple tuple; int numfks; AttrNumber conkey[INDEX_MAX_KEYS]; @@ -7982,13 +8467,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, Oid conppeqop[INDEX_MAX_KEYS]; Oid conffeqop[INDEX_MAX_KEYS]; Constraint *fkconstraint; - bool attach_it; + bool attached; + Oid indexOid; Oid constrOid; - ObjectAddress parentAddr, - childAddr, - childTableAddr; + ObjectAddress address, + referenced; ListCell *cell; - int i; tuple = SearchSysCache1(CONSTROID, parentConstrOid); if (!tuple) @@ -7996,142 +8480,46 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, parentConstrOid); constrForm = (Form_pg_constraint) GETSTRUCT(tuple); - /* only foreign keys */ - if (constrForm->contype != CONSTRAINT_FOREIGN) + /* Don't clone constraints whose parents are being cloned */ + if (list_member_oid(clone, constrForm->conparentid)) { ReleaseSysCache(tuple); continue; } - ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); + /* XXX lock level needs to prevent concurrent deletes */ + pkrel = table_open(constrForm->confrelid, AccessShareLock); DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey, conpfeqop, conppeqop, conffeqop); - for (i = 0; i < numfks; i++) + for (int i = 0; i < numfks; i++) mapped_conkey[i] = attmap[conkey[i] - 1]; /* * Before creating a new constraint, see whether any existing FKs are - * fit for the purpose. If one is, attach the parent constraint to it, - * and don't clone anything. This way we avoid the expensive + * fit for the purpose. If one is, attach the parent constraint to + * it, and don't clone anything. This way we avoid the expensive * verification step and don't end up with a duplicate FK. This also * means we don't consider this constraint when recursing to * partitions. */ - attach_it = false; + attached = false; foreach(cell, partFKs) { ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); - Form_pg_constraint partConstr; - HeapTuple partcontup; - Relation trigrel; - HeapTuple trigtup; - SysScanDesc scan; - ScanKeyData key; - attach_it = true; - - /* - * Do some quick & easy initial checks. If any of these fail, we - * cannot use this constraint, but keep looking. - */ - if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks) + if (tryAttachPartitionForeignKey(fk, + RelationGetRelid(partRel), + parentConstrOid, + numfks, + mapped_conkey, + confkey, + conpfeqop)) { - attach_it = false; - continue; + attached = true; + table_close(pkrel, NoLock); + break; } - for (i = 0; i < numfks; i++) - { - if (fk->conkey[i] != mapped_conkey[i] || - fk->confkey[i] != confkey[i] || - fk->conpfeqop[i] != conpfeqop[i]) - { - attach_it = false; - break; - } - } - if (!attach_it) - continue; - - /* - * Looks good so far; do some more extensive checks. Presumably - * the check for 'convalidated' could be dropped, since we don't - * really care about that, but let's be careful for now. - */ - partcontup = SearchSysCache1(CONSTROID, - ObjectIdGetDatum(fk->conoid)); - if (!partcontup) - elog(ERROR, "cache lookup failed for constraint %u", - fk->conoid); - partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); - if (OidIsValid(partConstr->conparentid) || - !partConstr->convalidated || - partConstr->condeferrable != constrForm->condeferrable || - partConstr->condeferred != constrForm->condeferred || - partConstr->confupdtype != constrForm->confupdtype || - partConstr->confdeltype != constrForm->confdeltype || - partConstr->confmatchtype != constrForm->confmatchtype) - { - ReleaseSysCache(partcontup); - attach_it = false; - continue; - } - - ReleaseSysCache(partcontup); - - /* - * Looks good! Attach this constraint. The action triggers in - * the new partition become redundant -- the parent table already - * has equivalent ones, and those will be able to reach the - * partition. Remove the ones in the partition. We identify them - * because they have our constraint OID, as well as being on the - * referenced rel. - */ - trigrel = heap_open(TriggerRelationId, RowExclusiveLock); - ScanKeyInit(&key, - Anum_pg_trigger_tgconstraint, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(fk->conoid)); - - scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true, - NULL, 1, &key); - while ((trigtup = systable_getnext(scan)) != NULL) - { - Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup); - ObjectAddress trigger; - - if (trgform->tgconstrrelid != fk->conrelid) - continue; - if (trgform->tgrelid != fk->confrelid) - continue; - - /* - * The constraint is originally set up to contain this trigger - * as an implementation object, so there's a dependency record - * that links the two; however, since the trigger is no longer - * needed, we remove the dependency link in order to be able - * to drop the trigger while keeping the constraint intact. - */ - deleteDependencyRecordsFor(TriggerRelationId, - trgform->oid, - false); - /* make dependency deletion visible to performDeletion */ - CommandCounterIncrement(); - ObjectAddressSet(trigger, TriggerRelationId, - trgform->oid); - performDeletion(&trigger, DROP_RESTRICT, 0); - /* make trigger drop visible, in case the loop iterates */ - CommandCounterIncrement(); - } - - systable_endscan(scan); - table_close(trigrel, RowExclusiveLock); - - ConstraintSetParentConstraint(fk->conoid, parentConstrOid, - RelationGetRelid(partRel)); - CommandCounterIncrement(); - attach_it = true; - break; } /* @@ -8139,18 +8527,46 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, * create a new one. In fact, there's no need to recurse for this * constraint to partitions, either. */ - if (attach_it) + if (attached) { ReleaseSysCache(tuple); continue; } + indexOid = constrForm->conindid; + + fkconstraint = makeNode(Constraint); + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(partRel), + NameStr(constrForm->conname))) + fkconstraint->conname = + ChooseConstraintName(RelationGetRelationName(partRel), + ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs), + "fkey", + RelationGetNamespace(partRel), NIL); + else + fkconstraint->conname = NameStr(constrForm->conname); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + fkconstraint->fk_matchtype = constrForm->confmatchtype; + for (int i = 0; i < numfks; i++) + { + Form_pg_attribute att; + + att = TupleDescAttr(RelationGetDescr(partRel), + mapped_conkey[i] - 1); + fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs, + makeString(NameStr(att->attname))); + } + constrOid = - CreateConstraintEntry(NameStr(constrForm->conname), + CreateConstraintEntry(fkconstraint->conname, constrForm->connamespace, CONSTRAINT_FOREIGN, - constrForm->condeferrable, - constrForm->condeferred, + fkconstraint->deferrable, + fkconstraint->initdeferred, constrForm->convalidated, parentConstrOid, RelationGetRelid(partRel), @@ -8158,92 +8574,191 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, numfks, numfks, InvalidOid, /* not a domain constraint */ - constrForm->conindid, /* same index */ + indexOid, constrForm->confrelid, /* same foreign rel */ confkey, conpfeqop, conppeqop, conffeqop, numfks, - constrForm->confupdtype, - constrForm->confdeltype, - constrForm->confmatchtype, + fkconstraint->fk_upd_action, + fkconstraint->fk_del_action, + fkconstraint->fk_matchtype, NULL, NULL, NULL, - false, - 1, false, true); - subclone = lappend_oid(subclone, constrOid); + false, /* islocal */ + 1, /* inhcount */ + false, /* conNoInherit */ + true); /* Set up partition dependencies for the new constraint */ - ObjectAddressSet(childAddr, ConstraintRelationId, constrOid); - recordDependencyOn(&childAddr, &parentAddr, - DEPENDENCY_PARTITION_PRI); - ObjectAddressSet(childTableAddr, RelationRelationId, + ObjectAddressSet(address, ConstraintRelationId, constrOid); + ObjectAddressSet(referenced, ConstraintRelationId, parentConstrOid); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI); + ObjectAddressSet(referenced, RelationRelationId, RelationGetRelid(partRel)); - recordDependencyOn(&childAddr, &childTableAddr, - DEPENDENCY_PARTITION_SEC); - - fkconstraint = makeNode(Constraint); - /* for now this is all we need */ - fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); - fkconstraint->fk_upd_action = constrForm->confupdtype; - fkconstraint->fk_del_action = constrForm->confdeltype; - fkconstraint->deferrable = constrForm->condeferrable; - fkconstraint->initdeferred = constrForm->condeferred; - - createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, - constrOid, constrForm->conindid, false); - - if (cloned) - { - ClonedConstraint *newc; - - /* - * Feed back caller about the constraints we created, so that they - * can set up constraint verification. - */ - newc = palloc(sizeof(ClonedConstraint)); - newc->relid = RelationGetRelid(partRel); - newc->refrelid = constrForm->confrelid; - newc->conindid = constrForm->conindid; - newc->conid = constrOid; - newc->constraint = fkconstraint; - - *cloned = lappend(*cloned, newc); - } + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC); + /* Done with the cloned constraint's tuple */ ReleaseSysCache(tuple); - } - pfree(attmap); - list_free_deep(partFKs); + /* Make all this visible before recursing */ + CommandCounterIncrement(); - /* - * If the partition is partitioned, recurse to handle any constraints that - * were cloned. - */ - if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && - subclone != NIL) - { - PartitionDesc partdesc = RelationGetPartitionDesc(partRel); - int i; - - for (i = 0; i < partdesc->nparts; i++) - { - Relation childRel; - - childRel = table_open(partdesc->oids[i], AccessExclusiveLock); - CloneFkReferencing(pg_constraint, - partRel, - childRel, - subclone, - cloned); - table_close(childRel, NoLock); /* keep lock till commit */ - } + addFkRecurseReferencing(wqueue, + fkconstraint, + partRel, + pkrel, + indexOid, + constrOid, + numfks, + confkey, + mapped_conkey, + conpfeqop, + conppeqop, + conffeqop, + false, /* no old check exists */ + AccessExclusiveLock); + table_close(pkrel, NoLock); } } +/* + * When the parent of a partition receives [the referencing side of] a foreign + * key, we must propagate that foreign key to the partition. However, the + * partition might already have an equivalent foreign key; this routine + * compares the given ForeignKeyCacheInfo (in the partition) to the FK defined + * by the other parameters. If they are equivalent, create the link between + * the two constraints and return true. + * + * If the given FK does not match the one defined by rest of the params, + * return false. + */ +static bool +tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, + Oid partRelid, + Oid parentConstrOid, + int numfks, + AttrNumber *mapped_conkey, + AttrNumber *confkey, + Oid *conpfeqop) +{ + HeapTuple parentConstrTup; + Form_pg_constraint parentConstr; + HeapTuple partcontup; + Form_pg_constraint partConstr; + Relation trigrel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple trigtup; + + parentConstrTup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(parentConstrOid)); + if (!parentConstrTup) + elog(ERROR, "cache lookup failed for constraint %u", parentConstrOid); + parentConstr = (Form_pg_constraint) GETSTRUCT(parentConstrTup); + + /* + * Do some quick & easy initial checks. If any of these fail, we cannot + * use this constraint. + */ + if (fk->confrelid != parentConstr->confrelid || fk->nkeys != numfks) + { + ReleaseSysCache(parentConstrTup); + return false; + } + for (int i = 0; i < numfks; i++) + { + if (fk->conkey[i] != mapped_conkey[i] || + fk->confkey[i] != confkey[i] || + fk->conpfeqop[i] != conpfeqop[i]) + { + ReleaseSysCache(parentConstrTup); + return false; + } + } + + /* + * Looks good so far; do some more extensive checks. Presumably the check + * for 'convalidated' could be dropped, since we don't really care about + * that, but let's be careful for now. + */ + partcontup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(fk->conoid)); + if (!partcontup) + elog(ERROR, "cache lookup failed for constraint %u", + fk->conoid); + partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); + if (OidIsValid(partConstr->conparentid) || + !partConstr->convalidated || + partConstr->condeferrable != parentConstr->condeferrable || + partConstr->condeferred != parentConstr->condeferred || + partConstr->confupdtype != parentConstr->confupdtype || + partConstr->confdeltype != parentConstr->confdeltype || + partConstr->confmatchtype != parentConstr->confmatchtype) + { + ReleaseSysCache(parentConstrTup); + ReleaseSysCache(partcontup); + return false; + } + + ReleaseSysCache(partcontup); + ReleaseSysCache(parentConstrTup); + + /* + * Looks good! Attach this constraint. The action triggers in the new + * partition become redundant -- the parent table already has equivalent + * ones, and those will be able to reach the partition. Remove the ones + * in the partition. We identify them because they have our constraint + * OID, as well as being on the referenced rel. + */ + trigrel = table_open(TriggerRelationId, RowExclusiveLock); + ScanKeyInit(&key, + Anum_pg_trigger_tgconstraint, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(fk->conoid)); + + scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true, + NULL, 1, &key); + while ((trigtup = systable_getnext(scan)) != NULL) + { + Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup); + ObjectAddress trigger; + + if (trgform->tgconstrrelid != fk->conrelid) + continue; + if (trgform->tgrelid != fk->confrelid) + continue; + + /* + * The constraint is originally set up to contain this trigger as an + * implementation object, so there's a dependency record that links + * the two; however, since the trigger is no longer needed, we remove + * the dependency link in order to be able to drop the trigger while + * keeping the constraint intact. + */ + deleteDependencyRecordsFor(TriggerRelationId, + trgform->oid, + false); + /* make dependency deletion visible to performDeletion */ + CommandCounterIncrement(); + ObjectAddressSet(trigger, TriggerRelationId, + trgform->oid); + performDeletion(&trigger, DROP_RESTRICT, 0); + /* make trigger drop visible, in case the loop iterates */ + CommandCounterIncrement(); + } + + systable_endscan(scan); + table_close(trigrel, RowExclusiveLock); + + ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid); + CommandCounterIncrement(); + return true; +} + + /* * ALTER TABLE ALTER CONSTRAINT * @@ -8363,8 +8878,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd, /* * Update deferrability of RI_FKey_noaction_del, * RI_FKey_noaction_upd, RI_FKey_check_ins and RI_FKey_check_upd - * triggers, but not others; see createForeignKeyTriggers and - * CreateFKCheckTrigger. + * triggers, but not others; see createForeignKeyActionTriggers + * and CreateFKCheckTrigger. */ if (tgform->tgfoid != F_RI_FKEY_NOACTION_DEL && tgform->tgfoid != F_RI_FKEY_NOACTION_UPD && @@ -9271,37 +9786,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, indexOid, false); } -/* - * Create the triggers that implement an FK constraint. - * - * NB: if you change any trigger properties here, see also - * ATExecAlterConstraint. - */ -void -createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, - Oid constraintOid, Oid indexOid, bool create_action) -{ - /* - * For the referenced side, create action triggers, if requested. (If the - * referencing side is partitioned, there is still only one trigger, which - * runs on the referenced side and points to the top of the referencing - * hierarchy.) - */ - if (create_action) - createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid, - indexOid); - - /* - * For the referencing side, create the check triggers. We only need - * these on the partitions. - */ - if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid, - fkconstraint, constraintOid, indexOid); - - CommandCounterIncrement(); -} - /* * ALTER TABLE DROP CONSTRAINT * @@ -14703,8 +15187,6 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) bool found_whole_row; Oid defaultPartOid; List *partBoundConstraint; - List *cloned; - ListCell *l; /* * We must lock the default partition if one exists, because attaching a @@ -14885,33 +15367,12 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) CloneRowTriggersToPartition(rel, attachrel); /* - * Clone foreign key constraints, and setup for Phase 3 to verify them. + * Clone foreign key constraints. Callee is responsible for setting up + * for phase 3 constraint verification. */ - cloned = NIL; - CloneForeignKeyConstraints(RelationGetRelid(rel), - RelationGetRelid(attachrel), &cloned); - foreach(l, cloned) - { - ClonedConstraint *clonedcon = lfirst(l); - NewConstraint *newcon; - Relation clonedrel; - AlteredTableInfo *parttab; - - clonedrel = relation_open(clonedcon->relid, NoLock); - parttab = ATGetQueueEntry(wqueue, clonedrel); - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = clonedcon->constraint->conname; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = clonedcon->refrelid; - newcon->refindid = clonedcon->conindid; - newcon->conid = clonedcon->conid; - newcon->qual = (Node *) clonedcon->constraint; - - parttab->constraints = lappend(parttab->constraints, newcon); - - relation_close(clonedrel, NoLock); - } + CloneForeignKeyConstraints(wqueue, + RelationGetRelid(rel), + RelationGetRelid(attachrel)); /* * Generate partition constraint from the partition bound specification. @@ -15102,6 +15563,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) RelationGetRelid(attachrel)); update_relispartition(NULL, cldIdxId, true); found = true; + + CommandCounterIncrement(); break; } } @@ -15280,6 +15743,7 @@ ATExecDetachPartition(Relation rel, RangeVar *name) Oid defaultPartOid; List *indexes; List *fks; + List *refconstraints; ListCell *cell; /* @@ -15293,6 +15757,10 @@ ATExecDetachPartition(Relation rel, RangeVar *name) partRel = table_openrv(name, ShareUpdateExclusiveLock); + /* Ensure that foreign keys still hold after this detach */ + refconstraints = GetParentedForeignKeyRefs(partRel); + CheckNoForeignKeyRefs(partRel, refconstraints, false); + /* All inheritance related checks are performed within the function */ RemoveInheritance(partRel, rel); @@ -15411,6 +15879,24 @@ ATExecDetachPartition(Relation rel, RangeVar *name) } list_free_deep(fks); + /* + * Any sub-constrains that are in the referenced-side of a larger + * constraint have to be removed. This partition is no longer part of the + * key space of the constraint. + */ + foreach(cell, refconstraints) + { + Oid constrOid = lfirst_oid(cell); + ObjectAddress constraint; + + ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid); + CommandCounterIncrement(); + + ObjectAddressSet(constraint, ConstraintRelationId, constrOid); + performDeletion(&constraint, DROP_RESTRICT, 0); + } + CommandCounterIncrement(); + /* * Invalidate the parent's relcache so that the partition is no longer * included in its partition descriptor. @@ -15791,3 +16277,118 @@ update_relispartition(Relation classRel, Oid relationId, bool newval) if (opened) table_close(classRel, RowExclusiveLock); } + +/* + * Return an OID list of constraints that reference the given relation + * that are marked as having a parent constraints. + */ +List * +GetParentedForeignKeyRefs(Relation partition) +{ + Relation pg_constraint; + HeapTuple tuple; + SysScanDesc scan; + ScanKeyData key[2]; + List *constraints = NIL; + + /* + * If no indexes, or no columns are referenceable by FKs, we can avoid the + * scan. + */ + if (RelationGetIndexList(partition) == NIL || + bms_is_empty(RelationGetIndexAttrBitmap(partition, + INDEX_ATTR_BITMAP_KEY))) + return NIL; + + /* Search for constraints referencing this table */ + pg_constraint = table_open(ConstraintRelationId, AccessShareLock); + ScanKeyInit(&key[0], + Anum_pg_constraint_confrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition))); + ScanKeyInit(&key[1], + Anum_pg_constraint_contype, BTEqualStrategyNumber, + F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN)); + + /* XXX This is a seqscan, as we don't have a usable index */ + scan = systable_beginscan(pg_constraint, InvalidOid, true, NULL, 2, key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* + * We only need to process constraints that are part of larger ones. + */ + if (!OidIsValid(constrForm->conparentid)) + continue; + + constraints = lappend_oid(constraints, constrForm->oid); + } + + systable_endscan(scan); + table_close(pg_constraint, AccessShareLock); + + return constraints; +} + +/* + * During an operation that removes a partition from a partitioned table + * (either a DETACH or DROP), verify that any foreign keys pointing to the + * partitioned table would not become invalid. An error raised if any + * referenced values exist. + * + * Returns a list of such constraints. + */ +void +CheckNoForeignKeyRefs(Relation partition, List *constraints, bool isDrop) +{ + ListCell *cell; + + /* + * In the DROP case, we can skip this check when this is a partitioned + * partition, because its partitions will go through this also, and we'd + * run the check twice uselessly. + * + * In the DETACH case, this is only called for the top-level relation, so + * we must run it nevertheless. + */ + if (isDrop && partition->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + + foreach(cell, constraints) + { + Oid constrOid = lfirst_oid(cell); + HeapTuple tuple; + Form_pg_constraint constrForm; + Relation rel; + Trigger trig; + + tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for constraint %u", constrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + Assert(OidIsValid(constrForm->conparentid)); + Assert(constrForm->confrelid == RelationGetRelid(partition)); + + /* prevent data changes into the referencing table until commit */ + rel = table_open(constrForm->conrelid, ShareLock); + + MemSet(&trig, 0, sizeof(trig)); + trig.tgoid = InvalidOid; + trig.tgname = NameStr(constrForm->conname); + trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN; + trig.tgisinternal = true; + trig.tgconstrrelid = RelationGetRelid(partition); + trig.tgconstrindid = constrForm->conindid; + trig.tgconstraint = constrForm->oid; + trig.tgdeferrable = false; + trig.tginitdeferred = false; + /* we needn't fill in remaining fields */ + + RI_Final_Check(&trig, rel, partition); + + ReleaseSysCache(tuple); + + table_close(rel, NoLock); + } +} diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 72f8a9d69cf..f7148e4b468 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -50,6 +50,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/rls.h" +#include "utils/ruleutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -220,8 +221,8 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot, Datum *vals, char *nulls); static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, - TupleTableSlot *violator, TupleDesc tupdesc, - int queryno) pg_attribute_noreturn(); + TupleTableSlot *violatorslot, TupleDesc tupdesc, + int queryno, bool partgone) pg_attribute_noreturn(); /* @@ -348,18 +349,22 @@ RI_FKey_check(TriggerData *trigdata) char paramname[16]; const char *querysep; Oid queryoids[RI_MAX_NUMKEYS]; + const char *pk_only; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * corresponding FK attributes. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (int i = 0; i < riinfo->nkeys; i++) { @@ -471,19 +476,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, char attname[MAX_QUOTED_NAME_LEN]; char paramname[16]; const char *querysep; + const char *pk_only; Oid queryoids[RI_MAX_NUMKEYS]; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * PK attributes themselves. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (int i = 0; i < riinfo->nkeys; i++) { @@ -1293,6 +1302,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) RangeTblEntry *fkrte; const char *sep; const char *fk_only; + const char *pk_only; int save_nestlevel; char workmembuf[32]; int spi_result; @@ -1350,7 +1360,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) /*---------- * The query string built is: * SELECT fk.keycols FROM [ONLY] relname fk - * LEFT OUTER JOIN ONLY pkrelname pk + * LEFT OUTER JOIN [ONLY] pkrelname pk * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH SIMPLE: @@ -1377,9 +1387,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) quoteRelationName(fkrelname, fk_rel); fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY "; + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; appendStringInfo(&querybuf, - " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON", - fk_only, fkrelname, pkrelname); + " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON", + fk_only, fkrelname, pk_only, pkrelname); strcpy(pkattname, "pk."); strcpy(fkattname, "fk."); @@ -1530,7 +1542,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, slot, tupdesc, - RI_PLAN_CHECK_LOOKUPPK); + RI_PLAN_CHECK_LOOKUPPK, false); ExecDropSingleTupleTableSlot(slot); } @@ -1546,6 +1558,195 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) return true; } +/* ---------- + * RI_Final_Check - + * + */ +void +RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) +{ + const RI_ConstraintInfo *riinfo; + StringInfoData querybuf; + char *constraintDef; + char pkrelname[MAX_QUOTED_REL_NAME_LEN]; + char fkrelname[MAX_QUOTED_REL_NAME_LEN]; + char pkattname[MAX_QUOTED_NAME_LEN + 3]; + char fkattname[MAX_QUOTED_NAME_LEN + 3]; + const char *sep; + const char *fk_only; + int spi_result; + SPIPlanPtr qplan; + int i; + + riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false); + + /* XXX handle the non-permission case?? */ + + /*---------- + * The query string built is: + * SELECT fk.keycols FROM [ONLY] relname fk + * JOIN pkrelname pk + * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) + * WHERE (<partition constraint>) AND + * For MATCH SIMPLE: + * (fk.keycol1 IS NOT NULL [AND ...]) + * For MATCH FULL: + * (fk.keycol1 IS NOT NULL [OR ...]) + * + * We attach COLLATE clauses to the operators when comparing columns + * that have different collations. + *---------- + */ + initStringInfo(&querybuf); + appendStringInfoString(&querybuf, "SELECT "); + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname); + sep = ", "; + } + + quoteRelationName(pkrelname, pk_rel); + quoteRelationName(fkrelname, fk_rel); + fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; + appendStringInfo(&querybuf, + " FROM %s%s fk JOIN %s pk ON", + fk_only, fkrelname, pkrelname); + strcpy(pkattname, "pk."); + strcpy(fkattname, "fk."); + sep = "("; + for (i = 0; i < riinfo->nkeys; i++) + { + Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]); + Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]); + Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]); + Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]); + + quoteOneName(pkattname + 3, + RIAttName(pk_rel, riinfo->pk_attnums[i])); + quoteOneName(fkattname + 3, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + ri_GenerateQual(&querybuf, sep, + pkattname, pk_type, + riinfo->pf_eq_oprs[i], + fkattname, fk_type); + if (pk_coll != fk_coll) + ri_GenerateQualCollation(&querybuf, pk_coll); + sep = "AND"; + } + + /* + * Start the WHERE clause with the partition constraint (except if this is + * the default partition and there's no other partition, because the + * partition constraint is the empty string in that case.) + */ + constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk"); + if (constraintDef && constraintDef[0] != '\0') + appendStringInfo(&querybuf, ") WHERE %s AND (", + constraintDef); + else + appendStringInfo(&querybuf, ") WHERE ("); + + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, + "%sfk.%s IS NOT NULL", + sep, fkattname); + switch (riinfo->confmatchtype) + { + case FKCONSTR_MATCH_SIMPLE: + sep = " AND "; + break; + case FKCONSTR_MATCH_FULL: + sep = " OR "; + break; + case FKCONSTR_MATCH_PARTIAL: + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MATCH PARTIAL not yet implemented"))); + break; + default: + elog(ERROR, "unrecognized confmatchtype: %d", + riinfo->confmatchtype); + break; + } + } + appendStringInfoChar(&querybuf, ')'); + + /* + * RI_Initial_Check changes work_mem here. + */ + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* + * Generate the plan. We don't need to cache it, and there are no + * arguments to the plan. + */ + qplan = SPI_prepare(querybuf.data, 0, NULL); + + if (qplan == NULL) + elog(ERROR, "SPI_prepare returned %s for %s", + SPI_result_code_string(SPI_result), querybuf.data); + + /* + * Run the plan. For safety we force a current snapshot to be used. (In + * transaction-snapshot mode, this arguably violates transaction isolation + * rules, but we really haven't got much choice.) We don't need to + * register the snapshot, because SPI_execute_snapshot will see to it. We + * need at most one tuple returned, so pass limit = 1. + */ + spi_result = SPI_execute_snapshot(qplan, + NULL, NULL, + GetLatestSnapshot(), + InvalidSnapshot, + true, false, 1); + + /* Check result */ + if (spi_result != SPI_OK_SELECT) + elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result)); + + /* Did we find a tuple that would violate the constraint? */ + if (SPI_processed > 0) + { + TupleTableSlot *slot; + HeapTuple tuple = SPI_tuptable->vals[0]; + TupleDesc tupdesc = SPI_tuptable->tupdesc; + RI_ConstraintInfo fake_riinfo; + + slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + + heap_deform_tuple(tuple, tupdesc, + slot->tts_values, slot->tts_isnull); + ExecStoreVirtualTuple(slot); + + /* + * The columns to look at in the result tuple are 1..N, not whatever + * they are in the fk_rel. Hack up riinfo so that ri_ReportViolation + * will behave properly. + * + * In addition to this, we have to pass the correct tupdesc to + * ri_ReportViolation, overriding its normal habit of using the pk_rel + * or fk_rel's tupdesc. + */ + memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo)); + for (i = 0; i < fake_riinfo.nkeys; i++) + fake_riinfo.pk_attnums[i] = i + 1; + + ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, + slot, tupdesc, 0, true); + } + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); +} + /* ---------- * Local functions below @@ -2078,7 +2279,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, pk_rel, fk_rel, newslot ? newslot : oldslot, NULL, - qkey->constr_queryno); + qkey->constr_queryno, false); return SPI_processed != 0; } @@ -2119,7 +2320,7 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, TupleTableSlot *violatorslot, TupleDesc tupdesc, - int queryno) + int queryno, bool partgone) { StringInfoData key_names; StringInfoData key_values; @@ -2158,9 +2359,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, * * Check table-level permissions next and, failing that, column-level * privileges. + * + * When a partition at the referenced side is being detached/dropped, we + * needn't check, since the user must be the table owner anyway. */ - - if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) + if (partgone) + has_perm = true; + else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) { aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT); if (aclresult != ACLCHECK_OK) @@ -2222,7 +2427,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, } } - if (onfk) + if (partgone) + ereport(ERROR, + (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), + errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"", + RelationGetRelationName(pk_rel), + NameStr(riinfo->conname)), + errdetail("Key (%s)=(%s) still referenced from table \"%s\".", + key_names.data, key_values.data, + RelationGetRelationName(fk_rel)))); + else if (onfk) ereport(ERROR, (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"", diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 85055bbb95a..63ec10be500 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -1818,6 +1818,24 @@ pg_get_partition_constraintdef(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(string_to_text(consrc)); } +/* + * pg_get_partconstrdef_string + * + * Returns the partition constraint as a C-string for the input relation, with + * the given alias. No pretty-printing. + */ +char * +pg_get_partconstrdef_string(Oid partitionId, char *aliasname) +{ + Expr *constr_expr; + List *context; + + constr_expr = get_partition_qual_relid(partitionId); + context = deparse_context_for(aliasname, partitionId); + + return deparse_expression((Node *) constr_expr, context, true, false); +} + /* * pg_get_constraintdef * diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index 85076d07437..87cffd967c2 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -83,6 +83,8 @@ extern void heap_create_init_fork(Relation rel); extern void heap_drop_with_catalog(Oid relid); +extern void pre_drop_class_check(Oid relationId, Oid objectSubId); + extern void heap_truncate(List *relids); extern void heap_truncate_one_rel(Relation rel); diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index ec3bb90b01b..efbb5c3a93b 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -52,6 +52,10 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid, extern void CheckTableNotInUse(Relation rel, const char *stmt); +extern List *GetParentedForeignKeyRefs(Relation partition); +extern void CheckNoForeignKeyRefs(Relation partition, List *constraints, + bool isDrop); + extern void ExecuteTruncate(TruncateStmt *stmt); extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs); @@ -76,10 +80,6 @@ extern void find_composite_type_dependencies(Oid typeOid, extern void check_of_type(HeapTuple typetuple); -extern void createForeignKeyTriggers(Relation rel, Oid refRelOid, - Constraint *fkconstraint, Oid constraintOid, - Oid indexOid, bool create_action); - extern void register_on_commit_action(Oid relid, OnCommitAction action); extern void remove_on_commit_action(Oid relid); diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index 846679ecc12..8e3b2e7a688 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -263,6 +263,7 @@ extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel, TupleTableSlot *old_slot, TupleTableSlot *new_slot); extern bool RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel); +extern void RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel); /* result values for RI_FKey_trigger_type: */ #define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */ diff --git a/src/include/utils/ruleutils.h b/src/include/utils/ruleutils.h index 3ebc01e7147..7c49e9d0a83 100644 --- a/src/include/utils/ruleutils.h +++ b/src/include/utils/ruleutils.h @@ -22,6 +22,7 @@ extern char *pg_get_indexdef_string(Oid indexrelid); extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty); extern char *pg_get_partkeydef_columns(Oid relid, bool pretty); +extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname); extern char *pg_get_constraintdef_command(Oid constraintId); extern char *deparse_expression(Node *expr, List *dpcontext, diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out index 4f7acb9b1ef..7ef7bc276c5 100644 --- a/src/test/regress/expected/foreign_key.out +++ b/src/test/regress/expected/foreign_key.out @@ -1532,19 +1532,6 @@ drop table pktable2, fktable2; -- -- Foreign keys and partitioned tables -- --- partitioned table in the referenced side are not allowed -CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b)) - PARTITION BY RANGE (a, b); --- verify with create table first ... -CREATE TABLE fk_notpartitioned_fk (a int, b int, - FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk); -ERROR: cannot reference partitioned table "fk_partitioned_pk" --- and then with alter table. -CREATE TABLE fk_notpartitioned_fk_2 (a int, b int); -ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b) - REFERENCES fk_partitioned_pk; -ERROR: cannot reference partitioned table "fk_partitioned_pk" -DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2; -- Creation of a partitioned hierarchy with irregular definitions CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int, PRIMARY KEY (a, b)); @@ -1686,7 +1673,7 @@ CREATE TABLE fk_partitioned_fk_full (x int, y int) PARTITION BY RANGE (x); CREATE TABLE fk_partitioned_fk_full_1 PARTITION OF fk_partitioned_fk_full DEFAULT; INSERT INTO fk_partitioned_fk_full VALUES (1, NULL); ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL; -- fails -ERROR: insert or update on table "fk_partitioned_fk_full" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey" +ERROR: insert or update on table "fk_partitioned_fk_full_1" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey" DETAIL: MATCH FULL does not allow mixing of null and nonnull key values. TRUNCATE fk_partitioned_fk_full; ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL; @@ -1902,7 +1889,7 @@ CREATE TABLE fk_partitioned_fk_2_2 PARTITION OF fk_partitioned_fk_2 FOR VALUES F INSERT INTO fk_partitioned_fk_2 VALUES (1600, 601), (1600, 1601); ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1600); -ERROR: insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_b_fkey" +ERROR: insert or update on table "fk_partitioned_fk_2_1" violates foreign key constraint "fk_partitioned_fk_a_b_fkey" DETAIL: Key (a, b)=(1600, 601) is not present in table "fk_notpartitioned_pk". INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601); ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 @@ -2037,3 +2024,197 @@ drop cascades to table fkpart1.fk_part drop cascades to table fkpart1.fk_part_1 drop cascades to table fkpart0.pkey drop cascades to table fkpart0.fk_part +-- Test a partitioned table as referenced table. +-- Verify basic functionality with a regular partition creation and a partition +-- with a different column layout, as well as partitions +-- added (created and attached) after creating the foreign key. +create schema fkpart3; +set search_path to fkpart3; +create table pk (a int primary key) partition by range (a); +create table pk1 partition of pk for values from (0) to (1000); +create table pk2 (b int, a int); +alter table pk2 drop column b; +alter table pk2 alter a set not null; +alter table pk attach partition pk2 for values from (1000) to (2000); +create table fk (a int) partition by range (a); +create table fk1 partition of fk for values from (0) to (750); +alter table fk add foreign key (a) references pk; +create table fk2 (b int, a int) ; +alter table fk2 drop column b; +alter table fk attach partition fk2 for values from (750) to (3500); +create table pk3 partition of pk for values from (2000) to (3000); +create table pk4 (like pk); +alter table pk attach partition pk4 for values from (3000) to (4000); +create table pk5 (c int, b int, a int not null) partition by range (a); +alter table pk5 drop column b, drop column c; +create table pk51 partition of pk5 for values from (4000) to (4500); +create table pk52 partition of pk5 for values from (4500) to (5000); +alter table pk attach partition pk5 for values from (4000) to (5000); +create table fk3 partition of fk for values from (3500) to (5000); +-- these should fail: referenced value not present +insert into fk values (1); +ERROR: insert or update on table "fk1" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(1) is not present in table "pk". +insert into fk values (1000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(1000) is not present in table "pk". +insert into fk values (2000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(2000) is not present in table "pk". +insert into fk values (3000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(3000) is not present in table "pk". +insert into fk values (4000); +ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(4000) is not present in table "pk". +insert into fk values (4500); +ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(4500) is not present in table "pk". +-- insert into the referenced table, now they should work +insert into pk values (1), (1000), (2000), (3000), (4000), (4500); +insert into fk values (1), (1000), (2000), (3000), (4000), (4500); +-- should fail: referencing value present +delete from pk where a = 1; +ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk" +DETAIL: Key (a)=(1) is still referenced from table "fk". +delete from pk where a = 1000; +ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk" +DETAIL: Key (a)=(1000) is still referenced from table "fk". +delete from pk where a = 2000; +ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk" +DETAIL: Key (a)=(2000) is still referenced from table "fk". +delete from pk where a = 3000; +ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk" +DETAIL: Key (a)=(3000) is still referenced from table "fk". +delete from pk where a = 4000; +ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk" +DETAIL: Key (a)=(4000) is still referenced from table "fk". +delete from pk where a = 4500; +ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk" +DETAIL: Key (a)=(4500) is still referenced from table "fk". +update pk set a = 2 where a = 1; +ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk" +DETAIL: Key (a)=(1) is still referenced from table "fk". +update pk set a = 1002 where a = 1000; +ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk" +DETAIL: Key (a)=(1000) is still referenced from table "fk". +update pk set a = 2002 where a = 2000; +ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk" +DETAIL: Key (a)=(2000) is still referenced from table "fk". +update pk set a = 3002 where a = 3000; +ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk" +DETAIL: Key (a)=(3000) is still referenced from table "fk". +update pk set a = 4002 where a = 4000; +ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk" +DETAIL: Key (a)=(4000) is still referenced from table "fk". +update pk set a = 4502 where a = 4500; +ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk" +DETAIL: Key (a)=(4500) is still referenced from table "fk". +-- now they should work +delete from fk; +update pk set a = 2 where a = 1; +delete from pk where a = 2; +update pk set a = 1002 where a = 1000; +delete from pk where a = 1002; +update pk set a = 2002 where a = 2000; +delete from pk where a = 2002; +update pk set a = 3002 where a = 3000; +delete from pk where a = 3002; +update pk set a = 4002 where a = 4000; +delete from pk where a = 4002; +update pk set a = 4502 where a = 4500; +delete from pk where a = 4502; +create schema fkpart3; +ERROR: schema "fkpart3" already exists +set search_path to fkpart3; +-- dropping/detaching partitions is prevented if that would break +-- a foreign key's existing data +create table droppk (a int primary key) partition by range (a); +create table droppk1 partition of droppk for values from (0) to (1000); +create table droppk_d partition of droppk default; +create table droppk2 partition of droppk for values from (1000) to (2000) + partition by range (a); +create table droppk21 partition of droppk2 for values from (1000) to (1400); +create table droppk2_d partition of droppk2 default; +insert into droppk values (1), (1000), (1500), (2000); +create table dropfk (a int references droppk); +insert into dropfk values (1), (1000), (1500), (2000); +-- these should all fail +alter table droppk detach partition droppk_d; +ERROR: removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5" +DETAIL: Key (a)=(2000) still referenced from table "dropfk". +alter table droppk2 detach partition droppk2_d; +ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4" +DETAIL: Key (a)=(1500) still referenced from table "dropfk". +alter table droppk detach partition droppk1; +ERROR: removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1" +DETAIL: Key (a)=(1) still referenced from table "dropfk". +alter table droppk detach partition droppk2; +ERROR: removing partition "droppk2" violates foreign key constraint "dropfk_a_fkey2" +DETAIL: Key (a)=(1000) still referenced from table "dropfk". +alter table droppk2 detach partition droppk21; +ERROR: removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3" +DETAIL: Key (a)=(1000) still referenced from table "dropfk". +drop table droppk_d; +ERROR: removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5" +DETAIL: Key (a)=(2000) still referenced from table "dropfk". +drop table droppk2_d; +ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4" +DETAIL: Key (a)=(1500) still referenced from table "dropfk". +drop table droppk1; +ERROR: removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1" +DETAIL: Key (a)=(1) still referenced from table "dropfk". +drop table droppk2; +ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4" +DETAIL: Key (a)=(1500) still referenced from table "dropfk". +drop table droppk21; +ERROR: removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3" +DETAIL: Key (a)=(1000) still referenced from table "dropfk". +delete from dropfk; +-- now they should all work +drop table droppk_d; +drop table droppk2_d; +drop table droppk1; +alter table droppk2 detach partition droppk21; +drop table droppk2; +-- Verify that initial constraint creation and cloning behave correctly +create schema fkpart4; +set search_path to fkpart4; +create table pk (a int primary key) partition by list (a); +create table pk1 partition of pk for values in (1) partition by list (a); +create table pk11 partition of pk1 for values in (1); +create table fk (a int) partition by list (a); +create table fk1 partition of fk for values in (1) partition by list (a); +create table fk11 partition of fk1 for values in (1); +alter table fk add foreign key (a) references pk; +create table pk2 partition of pk for values in (2); +create table pk3 (a int not null) partition by list (a); +create table pk31 partition of pk3 for values in (31); +create table pk32 (b int, a int not null); +alter table pk32 drop column b; +alter table pk3 attach partition pk32 for values in (32); +alter table pk attach partition pk3 for values in (31, 32); +create table fk2 partition of fk for values in (2); +create table fk3 (b int, a int); +alter table fk3 drop column b; +alter table fk attach partition fk3 for values in (3); +select pg_describe_object('pg_constraint'::regclass, oid, 0), confrelid::regclass, + case when conparentid <> 0 then pg_describe_object('pg_constraint'::regclass, conparentid, 0) else 'TOP' end +from pg_constraint +where conrelid in (select relid from pg_partition_tree('fk')) +order by conrelid::regclass::text, conname; + pg_describe_object | confrelid | case +------------------------------------+-----------+----------------------------------- + constraint fk_a_fkey on table fk | pk | TOP + constraint fk_a_fkey1 on table fk | pk1 | constraint fk_a_fkey on table fk + constraint fk_a_fkey2 on table fk | pk11 | constraint fk_a_fkey1 on table fk + constraint fk_a_fkey3 on table fk | pk2 | constraint fk_a_fkey on table fk + constraint fk_a_fkey4 on table fk | pk3 | constraint fk_a_fkey on table fk + constraint fk_a_fkey5 on table fk | pk31 | constraint fk_a_fkey4 on table fk + constraint fk_a_fkey6 on table fk | pk32 | constraint fk_a_fkey4 on table fk + constraint fk_a_fkey on table fk1 | pk | constraint fk_a_fkey on table fk + constraint fk_a_fkey on table fk11 | pk | constraint fk_a_fkey on table fk1 + constraint fk_a_fkey on table fk2 | pk | constraint fk_a_fkey on table fk + constraint fk_a_fkey on table fk3 | pk | constraint fk_a_fkey on table fk +(11 rows) + diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql index a5c7e147a7f..6acb0ac7c25 100644 --- a/src/test/regress/sql/foreign_key.sql +++ b/src/test/regress/sql/foreign_key.sql @@ -1145,18 +1145,6 @@ drop table pktable2, fktable2; -- Foreign keys and partitioned tables -- --- partitioned table in the referenced side are not allowed -CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b)) - PARTITION BY RANGE (a, b); --- verify with create table first ... -CREATE TABLE fk_notpartitioned_fk (a int, b int, - FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk); --- and then with alter table. -CREATE TABLE fk_notpartitioned_fk_2 (a int, b int); -ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b) - REFERENCES fk_partitioned_pk; -DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2; - -- Creation of a partitioned hierarchy with irregular definitions CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int, PRIMARY KEY (a, b)); @@ -1443,3 +1431,135 @@ alter table fkpart2.fk_part_1 drop constraint fkey; -- ok alter table fkpart2.fk_part_1_1 drop constraint my_fkey; -- doesn't exist drop schema fkpart0, fkpart1, fkpart2 cascade; + +-- Test a partitioned table as referenced table. +-- Verify basic functionality with a regular partition creation and a partition +-- with a different column layout, as well as partitions +-- added (created and attached) after creating the foreign key. +create schema fkpart3; +set search_path to fkpart3; + +create table pk (a int primary key) partition by range (a); +create table pk1 partition of pk for values from (0) to (1000); +create table pk2 (b int, a int); +alter table pk2 drop column b; +alter table pk2 alter a set not null; +alter table pk attach partition pk2 for values from (1000) to (2000); + +create table fk (a int) partition by range (a); +create table fk1 partition of fk for values from (0) to (750); +alter table fk add foreign key (a) references pk; +create table fk2 (b int, a int) ; +alter table fk2 drop column b; +alter table fk attach partition fk2 for values from (750) to (3500); + +create table pk3 partition of pk for values from (2000) to (3000); +create table pk4 (like pk); +alter table pk attach partition pk4 for values from (3000) to (4000); + +create table pk5 (c int, b int, a int not null) partition by range (a); +alter table pk5 drop column b, drop column c; +create table pk51 partition of pk5 for values from (4000) to (4500); +create table pk52 partition of pk5 for values from (4500) to (5000); +alter table pk attach partition pk5 for values from (4000) to (5000); + +create table fk3 partition of fk for values from (3500) to (5000); + +-- these should fail: referenced value not present +insert into fk values (1); +insert into fk values (1000); +insert into fk values (2000); +insert into fk values (3000); +insert into fk values (4000); +insert into fk values (4500); +-- insert into the referenced table, now they should work +insert into pk values (1), (1000), (2000), (3000), (4000), (4500); +insert into fk values (1), (1000), (2000), (3000), (4000), (4500); + +-- should fail: referencing value present +delete from pk where a = 1; +delete from pk where a = 1000; +delete from pk where a = 2000; +delete from pk where a = 3000; +delete from pk where a = 4000; +delete from pk where a = 4500; +update pk set a = 2 where a = 1; +update pk set a = 1002 where a = 1000; +update pk set a = 2002 where a = 2000; +update pk set a = 3002 where a = 3000; +update pk set a = 4002 where a = 4000; +update pk set a = 4502 where a = 4500; +-- now they should work +delete from fk; +update pk set a = 2 where a = 1; +delete from pk where a = 2; +update pk set a = 1002 where a = 1000; +delete from pk where a = 1002; +update pk set a = 2002 where a = 2000; +delete from pk where a = 2002; +update pk set a = 3002 where a = 3000; +delete from pk where a = 3002; +update pk set a = 4002 where a = 4000; +delete from pk where a = 4002; +update pk set a = 4502 where a = 4500; +delete from pk where a = 4502; + +create schema fkpart3; +set search_path to fkpart3; +-- dropping/detaching partitions is prevented if that would break +-- a foreign key's existing data +create table droppk (a int primary key) partition by range (a); +create table droppk1 partition of droppk for values from (0) to (1000); +create table droppk_d partition of droppk default; +create table droppk2 partition of droppk for values from (1000) to (2000) + partition by range (a); +create table droppk21 partition of droppk2 for values from (1000) to (1400); +create table droppk2_d partition of droppk2 default; +insert into droppk values (1), (1000), (1500), (2000); +create table dropfk (a int references droppk); +insert into dropfk values (1), (1000), (1500), (2000); +-- these should all fail +alter table droppk detach partition droppk_d; +alter table droppk2 detach partition droppk2_d; +alter table droppk detach partition droppk1; +alter table droppk detach partition droppk2; +alter table droppk2 detach partition droppk21; +drop table droppk_d; +drop table droppk2_d; +drop table droppk1; +drop table droppk2; +drop table droppk21; +delete from dropfk; +-- now they should all work +drop table droppk_d; +drop table droppk2_d; +drop table droppk1; +alter table droppk2 detach partition droppk21; +drop table droppk2; + +-- Verify that initial constraint creation and cloning behave correctly +create schema fkpart4; +set search_path to fkpart4; +create table pk (a int primary key) partition by list (a); +create table pk1 partition of pk for values in (1) partition by list (a); +create table pk11 partition of pk1 for values in (1); +create table fk (a int) partition by list (a); +create table fk1 partition of fk for values in (1) partition by list (a); +create table fk11 partition of fk1 for values in (1); +alter table fk add foreign key (a) references pk; +create table pk2 partition of pk for values in (2); +create table pk3 (a int not null) partition by list (a); +create table pk31 partition of pk3 for values in (31); +create table pk32 (b int, a int not null); +alter table pk32 drop column b; +alter table pk3 attach partition pk32 for values in (32); +alter table pk attach partition pk3 for values in (31, 32); +create table fk2 partition of fk for values in (2); +create table fk3 (b int, a int); +alter table fk3 drop column b; +alter table fk attach partition fk3 for values in (3); +select pg_describe_object('pg_constraint'::regclass, oid, 0), confrelid::regclass, + case when conparentid <> 0 then pg_describe_object('pg_constraint'::regclass, conparentid, 0) else 'TOP' end +from pg_constraint +where conrelid in (select relid from pg_partition_tree('fk')) +order by conrelid::regclass::text, conname; -- 2.17.1