Here's a patch to allow partitioned tables to be referenced by foreign keys. Current state is WIP, but everything should work; see below for the expected exception.
The design is very simple: have one pg_constraint row for each partition on each side, each row pointing to the topmost table on the other side; triggers appear on each leaf partition (and naturally they don't appear on any intermediate partitioned table). There are tests that should cover all the basic features. pg_upgrade tests work (containing relevant tables, as regress/foreign_key.sql leaves them behind for this reason: partitioned-references-partitioned). There is one requisite feature still missing from this patch: when a partition on the referenced side is detached or dropped, we must ensure no referencing row remains on the other side. For this, I have an (unmerged and thus unsubmitted here) new ri_triggers.c routine RI_Inverted_Initial_Check (need to come up with better name, heh) that verifies this, invoked at DETACH/DROP time. Also, some general code cleanup and documentation patching is needed. I'm posting this now so that I can take my hands off it for a few days; will return to post an updated version at some point before next commitfest. I wanted to have this ready for this commitfest, but RL dictated otherwise. This patch took a *very long time* to write ... I wrote three different recursion models for this. One thing I realized while writing this, is that my commit 3de241dba86f ("Foreign keys on partitioned tables") put function CloneForeignKeyConstraints() in catalog/pg_constraint.c that should really have been in tablecmds.c. In this patch I produced some siblings of that function still in pg_constraint.c, but I intend to move the whole lot to tablecmds.c before commit. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index f4057a9f15..4e277d6bed 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -38,6 +38,10 @@ #include "utils/tqual.h" +static void deconstruct_pg_constraint_row(HeapTuple contuple, TupleDesc tupdesc, + int *numfks, AttrNumber *conkey, + AttrNumber *confkey, Oid *conpfeqop, + Oid *conppeqop, Oid *conffeqop); static void clone_fk_constraints(Relation pg_constraint, Relation parentRel, Relation partRel, List *clone, List **cloned); @@ -395,30 +399,37 @@ CreateConstraintEntry(const char *constraintName, * order, though. * * The *cloned list is appended ClonedConstraint elements describing what was - * created. + * created, for the purposes of validating the constraint in ALTER TABLE's + * Phase 3. */ + void CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) { Relation pg_constraint; Relation parentRel; Relation rel; - ScanKeyData key; + ScanKeyData key[2]; SysScanDesc scan; HeapTuple tuple; + AttrNumber *attmap; List *clone = NIL; + ListCell *cell; parentRel = heap_open(parentId, NoLock); /* already got lock */ /* see ATAddForeignKeyConstraint about lock level */ rel = heap_open(relationId, AccessExclusiveLock); pg_constraint = heap_open(ConstraintRelationId, RowShareLock); - /* Obtain the list of constraints to clone or attach */ - ScanKeyInit(&key, + /* + * Search for constraints where the parent is in the referencing side. + * obtain the list of constraints to clone or attach. + */ + ScanKeyInit(&key[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(parentId)); scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &key); + NULL, 1, key); while ((tuple = systable_getnext(scan)) != NULL) clone = lappend_oid(clone, HeapTupleGetOid(tuple)); systable_endscan(scan); @@ -426,9 +437,120 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) /* Do the actual work, recursing to partitions as needed */ clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned); - /* We're done. Clean up */ + list_free(clone); + clone = NIL; + + /* + * Now search for any constraints where this partition is in the + * referenced side. However, we must ignore any constraint whose parent + * constraint is also going to be cloned, to avoid duplicates. So do it + * in two steps: first construct the list of constraints to clone, then go + * over that list cloning those whose parents are not in the list. (We + * must not rely on the parent being seen first, since catalog order could + * return children first.) + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(rel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + ScanKeyInit(&key[0], + Anum_pg_constraint_confrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(parentId)); + ScanKeyInit(&key[1], + Anum_pg_constraint_contype, BTEqualStrategyNumber, + F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN)); + /* This is a seqscan, as we don't have a usable index ... */ + scan = systable_beginscan(pg_constraint, InvalidOid, true, + NULL, 2, key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* ignore this constraint if the parent is already on the list */ + if (list_member_oid(clone, constrForm->conparentid)) + continue; + + clone = lappend_oid(clone, HeapTupleGetOid(tuple)); + } + systable_endscan(scan); + + foreach(cell, clone) + { + Oid constrOid = lfirst_oid(cell); + Form_pg_constraint constrForm; + Relation fkRel; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_confkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + Constraint *fkconstraint; + + tuple = SearchSysCache1(CONSTROID, constrOid); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for constraint %u", constrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* skip children whose parents are going to be cloned, as above */ + if (list_member_oid(clone, constrForm->conparentid)) + { + ReleaseSysCache(tuple); + continue; + } + + /* XXX correct lock? */ + fkRel = heap_open(constrForm->conrelid, ShareRowExclusiveLock); + + deconstruct_pg_constraint_row(tuple, + RelationGetDescr(pg_constraint), + &numfks, + conkey, + confkey, + conpfeqop, + conppeqop, + conffeqop); + for (int i = 0; i < numfks; i++) + mapped_confkey[i] = attmap[confkey[i] - 1]; + + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + fkconstraint->initially_valid = true; /* XXX correct? */ + fkconstraint->fk_matchtype = constrForm->confmatchtype; + fkconstraint->fk_attrs = list_make1(makeString(get_attname(relationId, + conkey[0], + false))); + + /* + * Add the new foreign key constraint pointing to the new partition. + * Note that because this new partition appears in the referenced side + * of the constraint, we don't need to set up for Phase 3 check. + */ + addFkRecurseReferenced(NULL, + fkconstraint, + fkRel, + rel, + HeapTupleGetOid(tuple), + numfks, + mapped_confkey, + conkey, + conpfeqop, + conppeqop, + conffeqop, + true); + + heap_close(fkRel, NoLock); + ReleaseSysCache(tuple); + } + + /* We're done. Clean up, keeping locks till commit */ heap_close(parentRel, NoLock); - heap_close(rel, NoLock); /* keep lock till commit */ + heap_close(rel, NoLock); heap_close(pg_constraint, RowShareLock); } @@ -446,18 +568,16 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) * This function recurses to partitions, if the new partition is partitioned; * of course, only do this for FKs that were actually cloned. */ + static void clone_fk_constraints(Relation pg_constraint, Relation parentRel, Relation partRel, List *clone, List **cloned) { - TupleDesc tupdesc; AttrNumber *attmap; List *partFKs; List *subclone = NIL; ListCell *cell; - tupdesc = RelationGetDescr(pg_constraint); - /* * The constraint key may differ, if the columns in the partition are * different. This map is used to convert them. @@ -473,6 +593,7 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, Oid parentConstrOid = lfirst_oid(cell); Form_pg_constraint constrForm; HeapTuple tuple; + int numfks; AttrNumber conkey[INDEX_MAX_KEYS]; AttrNumber mapped_conkey[INDEX_MAX_KEYS]; AttrNumber confkey[INDEX_MAX_KEYS]; @@ -484,12 +605,8 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, Oid constrOid; ObjectAddress parentAddr, childAddr; - int nelem; ListCell *cell; int i; - ArrayType *arr; - Datum datum; - bool isnull; tuple = SearchSysCache1(CONSTROID, parentConstrOid); if (!tuple) @@ -503,96 +620,26 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, ReleaseSysCache(tuple); continue; } + if (list_member_oid(clone, constrForm->conparentid)) + { + ReleaseSysCache(tuple); + continue; + } ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); - datum = fastgetattr(tuple, Anum_pg_constraint_conkey, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conkey"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "conkey is not a 1-D smallint array"); - memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); + deconstruct_pg_constraint_row(tuple, + RelationGetDescr(pg_constraint), + &numfks, + conkey, + confkey, + conpfeqop, + conppeqop, + conffeqop); - for (i = 0; i < nelem; i++) + for (i = 0; i < numfks; i++) mapped_conkey[i] = attmap[conkey[i] - 1]; - datum = fastgetattr(tuple, Anum_pg_constraint_confkey, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null confkey"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "confkey is not a 1-D smallint array"); - memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); - - datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conpfeqop"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conpfeqop is not a 1-D OID array"); - memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - - datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conpfeqop"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conpfeqop is not a 1-D OID array"); - memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - - datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conppeqop"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conppeqop is not a 1-D OID array"); - memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - - datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conffeqop"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conffeqop is not a 1-D OID array"); - memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - /* * Before creating a new constraint, see whether any existing FKs are * fit for the purpose. If one is, attach the parent constraint to it, @@ -614,12 +661,12 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, * Do some quick & easy initial checks. If any of these fail, we * cannot use this constraint, but keep looking. */ - if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem) + if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks) { attach_it = false; continue; } - for (i = 0; i < nelem; i++) + for (i = 0; i < numfks; i++) { if (fk->conkey[i] != mapped_conkey[i] || fk->confkey[i] != confkey[i] || @@ -687,8 +734,8 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, HeapTupleGetOid(tuple), RelationGetRelid(partRel), mapped_conkey, - nelem, - nelem, + numfks, + numfks, InvalidOid, /* not a domain constraint */ constrForm->conindid, /* same index */ constrForm->confrelid, /* same foreign rel */ @@ -696,7 +743,7 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, conpfeqop, conppeqop, conffeqop, - nelem, + numfks, constrForm->confupdtype, constrForm->confdeltype, constrForm->confmatchtype, @@ -719,8 +766,12 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, fkconstraint->deferrable = constrForm->condeferrable; fkconstraint->initdeferred = constrForm->condeferred; - createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, - constrOid, constrForm->conindid, false); + /* If this is a plain relation, create the check triggers */ + if (partRel->rd_rel->relkind == RELKIND_RELATION) + createForeignKeyCheckTriggers(RelationGetRelid(partRel), + constrForm->confrelid, + fkconstraint, constrOid, + constrForm->conindid); if (cloned) { @@ -756,6 +807,9 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, PartitionDesc partdesc = RelationGetPartitionDesc(partRel); int i; + /* make previously created constraints visible */ + CommandCounterIncrement(); + for (i = 0; i < partdesc->nparts; i++) { Relation childRel; @@ -772,6 +826,97 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, } /* + * Given a pg_constraint tuple, deconstruct and return some of the elements + * in it; this is used when we need to clone a foreign key constraint for a + * new partition being created or attached on a partitioned table. + */ +static void +deconstruct_pg_constraint_row(HeapTuple tuple, + TupleDesc tupdesc, + int *numfks, + AttrNumber *conkey, + AttrNumber *confkey, + Oid *conpfeqop, + Oid *conppeqop, + Oid *conffeqop) +{ + Datum datum; + bool isnull; + ArrayType *arr; + int nelem; + + datum = fastgetattr(tuple, Anum_pg_constraint_conkey, + tupdesc, &isnull); + if (isnull) + elog(ERROR, "null conkey"); + arr = DatumGetArrayTypeP(datum); + *numfks = nelem = ARR_DIMS(arr)[0]; + if (ARR_NDIM(arr) != 1 || + nelem < 1 || + nelem > INDEX_MAX_KEYS || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != INT2OID) + elog(ERROR, "conkey is not a 1-D smallint array"); + memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); + + datum = fastgetattr(tuple, Anum_pg_constraint_confkey, + tupdesc, &isnull); + if (isnull) + elog(ERROR, "null confkey"); + arr = DatumGetArrayTypeP(datum); + nelem = ARR_DIMS(arr)[0]; + if (ARR_NDIM(arr) != 1 || + nelem < 1 || + nelem > INDEX_MAX_KEYS || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != INT2OID) + elog(ERROR, "confkey is not a 1-D smallint array"); + memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); + + datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop, + tupdesc, &isnull); + if (isnull) + elog(ERROR, "null conpfeqop"); + arr = DatumGetArrayTypeP(datum); + nelem = ARR_DIMS(arr)[0]; + if (ARR_NDIM(arr) != 1 || + nelem < 1 || + nelem > INDEX_MAX_KEYS || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != OIDOID) + elog(ERROR, "conpfeqop is not a 1-D OID array"); + memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); + + datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop, + tupdesc, &isnull); + if (isnull) + elog(ERROR, "null conppeqop"); + arr = DatumGetArrayTypeP(datum); + nelem = ARR_DIMS(arr)[0]; + if (ARR_NDIM(arr) != 1 || + nelem < 1 || + nelem > INDEX_MAX_KEYS || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != OIDOID) + elog(ERROR, "conppeqop is not a 1-D OID array"); + memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); + + datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop, + tupdesc, &isnull); + if (isnull) + elog(ERROR, "null conffeqop"); + arr = DatumGetArrayTypeP(datum); + nelem = ARR_DIMS(arr)[0]; + if (ARR_NDIM(arr) != 1 || + nelem < 1 || + nelem > INDEX_MAX_KEYS || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != OIDOID) + elog(ERROR, "conffeqop is not a 1-D OID array"); + memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); +} + +/* * Test whether given name is currently used as a constraint name * for the given object (relation or domain). * diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 153aec263e..729d9831f2 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -415,6 +415,14 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo * Relation rel, Constraint *fkconstraint, Oid parentConstr, bool recurse, bool recursing, LOCKMODE lockmode); +static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid parentConstr, int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok, + bool create_constraint); +static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -7138,9 +7146,11 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, /* * Note that we currently never recurse for FK constraints, so the - * "recurse" flag is silently ignored. + * "recurse" flag is silently ignored. XXX this is a lie now. * - * Assign or validate constraint name + * Assign or validate constraint name XXX name assignment now also + * occurs inside addFkRecurseReferenced; fix the resulting + * redundancy. */ if (newConstraint->conname) { @@ -7318,6 +7328,12 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Subroutine for ATExecAddConstraint. Must already hold exclusive * lock on the rel, and have done appropriate validity checks for it. * We do permissions checks here, however. + * + * When the referenced or referencing tables (or both) are partitioned, + * multiple pg_constraint rows are required -- one for each partitioned table + * and each partition on each side (fortunately, not one for every combination + * thereof). We also need the appropriate triggers to be created on each leaf + * partition. */ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, @@ -7337,7 +7353,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, int numfks, numpks; Oid indexOid; - Oid constrOid; bool old_check_ok; ObjectAddress address; ListCell *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop); @@ -7355,12 +7370,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Validity checks (permission checks wait till we have the column * numbers) */ - if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot reference partitioned table \"%s\"", - RelationGetRelationName(pkrel)))); - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { if (!recurse) @@ -7378,7 +7387,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, errdetail("This feature is not yet supported on partitioned tables."))); } - if (pkrel->rd_rel->relkind != RELKIND_RELATION) + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("referenced relation \"%s\" is not a table", @@ -7677,27 +7687,101 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, } /* + * Create all the constraint and trigger objects, recursing to partitions + * as necessary. First handle the referenced side. + */ + address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel, + InvalidOid, /* no parent constraint */ + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok); + + /* + * Now handle the referencing side. We don't need the topmost constraint + * on this side, because the constraint we created above fills that role; + * any recursion done there needs to create one, however. + */ + addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel, + address.objectId, + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok, + false); + + /* + * Close pk table, but keep lock until we've committed. + */ + heap_close(pkrel, NoLock); + + return address; +} + +/* + * addFkRecurseReferenced + * recursive subroutine for ATAddForeignKeyConstraint, referenced side + * + * Create pg_constraint rows for the referenced side of the constraint, + * referencing the parent of the referencing side; also create action triggers, + * as necessary. If the table is partitioned, recurse to handle each + * partition. + */ +ObjectAddress +addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid parentConstraint, int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok) +{ + ObjectAddress address; + Oid constrOid; + char *conname; + + /* + * Determine a good constraint name to use. If caller supplied us one, + * use that, unless it's taken. XXX this code is redundant with a name + * computed upstream ... + */ + if (fkconstraint->conname && + !ConstraintNameIsUsed(CONSTRAINT_RELATION, RelationGetRelid(rel), + fkconstraint->conname)) + conname = fkconstraint->conname; + else + conname = ChooseConstraintName(RelationGetRelationName(rel), + strVal(linitial(fkconstraint->fk_attrs)), + "fkey", + RelationGetNamespace(rel), + NIL); + + /* * Record the FK constraint in pg_constraint. */ - constrOid = CreateConstraintEntry(fkconstraint->conname, + constrOid = CreateConstraintEntry(conname, RelationGetNamespace(rel), CONSTRAINT_FOREIGN, fkconstraint->deferrable, fkconstraint->initdeferred, fkconstraint->initially_valid, - parentConstr, + parentConstraint, RelationGetRelid(rel), fkattnum, numfks, numfks, InvalidOid, /* not a domain constraint */ - indexOid, + //indexOid, + InvalidOid, RelationGetRelid(pkrel), pkattnum, pfeqoperators, ppeqoperators, ffeqoperators, - numpks, + numfks, fkconstraint->fk_upd_action, fkconstraint->fk_del_action, fkconstraint->fk_matchtype, @@ -7711,79 +7795,246 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, false); /* is_internal */ ObjectAddressSet(address, ConstraintRelationId, constrOid); - /* - * Create the triggers that will enforce the constraint. We only want the - * action triggers to appear for the parent partitioned relation, even - * though the constraints also exist below. - */ - createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, - constrOid, indexOid, !recursing); + /* make new constraint visible, in case we add more */ + CommandCounterIncrement(); + + /* XXX do we need to create any dependencies? */ +// recordDependencyOn( .. ); /* - * Tell Phase 3 to check that the constraint is satisfied by existing - * rows. We can skip this during table creation, when requested explicitly - * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're - * recreating a constraint following a SET DATA TYPE operation that did - * not impugn its validity. + * If the referenced table is a plain relation, create the action triggers + * that enforce the constraint. */ - if (!old_check_ok && !fkconstraint->skip_validation) + if (pkrel->rd_rel->relkind == RELKIND_RELATION) { - NewConstraint *newcon; - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = fkconstraint->conname; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = RelationGetRelid(pkrel); - newcon->refindid = indexOid; - newcon->conid = constrOid; - newcon->qual = (Node *) fkconstraint; - - tab->constraints = lappend(tab->constraints, newcon); + createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel), + fkconstraint, + constrOid, /* XXX indexOid */ InvalidOid); } /* - * When called on a partitioned table, recurse to create the constraint on - * the partitions also. + * If the referenced table is partitioned, recurse on ourselves to handle + * each partition. We need one pg_constraint row created for each + * partition in addition to the pg_constraint row for the parent table. */ - if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { - PartitionDesc partdesc; + PartitionDesc pd = RelationGetPartitionDesc(pkrel); + int i; - partdesc = RelationGetPartitionDesc(rel); - - for (i = 0; i < partdesc->nparts; i++) + for (i = 0; i < pd->nparts; i++) { - Oid partitionId = partdesc->oids[i]; - Relation partition = heap_open(partitionId, lockmode); - AlteredTableInfo *childtab; - ObjectAddress childAddr; + Relation partRel; + AttrNumber *map; + AttrNumber *mapped_pkattnum; + Constraint *newfk; - CheckTableNotInUse(partition, "ALTER TABLE"); + partRel = heap_open(pd->oids[i], ShareRowExclusiveLock); - /* Find or create work queue entry for this table */ - childtab = ATGetQueueEntry(wqueue, partition); + /* + * Create a new Constraint node as a copy of the original one, + * blanking out the name. It is possible to reuse the same name, + * but only in limited circumstances so we'll be generating names + * anyway, so we don't go any great lengths to preserve the + * original name. + * + * XXX if the original doesn't have a name, this is a waste. + */ + newfk = copyObject(fkconstraint); /* XXX memleak */ + pfree(newfk->conname); + newfk->conname = NULL; - childAddr = - ATAddForeignKeyConstraint(wqueue, childtab, partition, - fkconstraint, constrOid, - recurse, true, lockmode); + /* + * Map the attribute numbers in the referenced side of the FK + * definition to match the partition's column layout. + */ + map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel), + RelationGetDescr(pkrel), + gettext_noop("could not convert row type")); + if (map) + { + int j; - /* Record this constraint as dependent on the parent one */ - recordDependencyOn(&childAddr, &address, DEPENDENCY_INTERNAL_AUTO); + mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks); + for (j = 0; j < numfks; j++) + mapped_pkattnum[j] = map[pkattnum[j] - 1]; + } + else + mapped_pkattnum = pkattnum; - heap_close(partition, NoLock); + /* do the deed */ + addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel, + constrOid, numfks, mapped_pkattnum, fkattnum, + pfeqoperators, ppeqoperators, ffeqoperators, + old_check_ok); + + /* Done -- clean up (but keep the lock) */ + heap_close(partRel, NoLock); + if (map) + { + pfree(mapped_pkattnum); + pfree(map); + } } } - /* - * Close pk table, but keep lock until we've committed. - */ - heap_close(pkrel, NoLock); - return address; } /* + * addFkRecurseReferencing + * recursive subroutine for ATAddForeignKeyConstraint, referencing side + * + * Note: we purposefully ignore the given constraint name. + */ +static void +addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid parentConstr, int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok, + bool create_constraint) +{ + Oid constrOid; /* uninitialized */ + char *conname; + + if (create_constraint) + { + /* Figure out a good constraint name to use */ + conname = ChooseConstraintName(RelationGetRelationName(rel), + strVal(linitial(fkconstraint->fk_attrs)), + "fkey", + RelationGetNamespace(rel), NIL); + + /* + * Record the FK constraint in pg_constraint. + */ + constrOid = CreateConstraintEntry(conname, + RelationGetNamespace(rel), + CONSTRAINT_FOREIGN, + fkconstraint->deferrable, + fkconstraint->initdeferred, + fkconstraint->initially_valid, + parentConstr, + RelationGetRelid(rel), + fkattnum, + numfks, + numfks, + InvalidOid, /* not a domain constraint */ + /* XXX indexOid, */ InvalidOid, + RelationGetRelid(pkrel), + pkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + numfks, + fkconstraint->fk_upd_action, + fkconstraint->fk_del_action, + fkconstraint->fk_matchtype, + NULL, /* no exclusion constraint */ + NULL, /* no check constraint */ + NULL, + NULL, + true, /* islocal */ + 0, /* inhcount */ + true, /* isnoinherit */ + false); /* is_internal */ + /* make name visible, in case we add more constraints */ + CommandCounterIncrement(); + } + else + constrOid = parentConstr; + + /* + * If the referencing relation is a plain table, add the check triggers + * to it and, if necessary, schedule it to be checked in Phase 3. + * + * If the relation is partitioned, drill down to do it to its partitions. + */ + if (rel->rd_rel->relkind == RELKIND_RELATION) + { + createForeignKeyCheckTriggers(RelationGetRelid(rel), + RelationGetRelid(pkrel), + fkconstraint, + constrOid, + /*indexOid*/InvalidOid); /* FIXME */ + + /* + * Tell Phase 3 to check that the constraint is satisfied by existing + * rows. We can skip this during table creation, when requested explicitly + * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're + * recreating a constraint following a SET DATA TYPE operation that did + * not impugn its validity. + */ + if (!old_check_ok && !fkconstraint->skip_validation) + { + NewConstraint *newcon; + AlteredTableInfo *tab; + + tab = ATGetQueueEntry(wqueue, rel); + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = fkconstraint->conname; + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = RelationGetRelid(pkrel); + newcon->refindid = /*indexOid*/ InvalidOid; /* FIXME */ + newcon->conid = constrOid; + newcon->qual = (Node *) fkconstraint; + + tab->constraints = lappend(tab->constraints, newcon); + } + } + else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc pd; + int i; + + pd = RelationGetPartitionDesc(rel); + for (i = 0; i < pd->nparts; i++) + { + Relation partRel; + AttrNumber *map; + AttrNumber *mapped_fkattnum; + + partRel = heap_open(pd->oids[i], ShareRowExclusiveLock); + + /* + * Map the attribute numbers in the referencing side of the FK + * definition to match the partition's column layout. + */ + map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel), + RelationGetDescr(rel), + gettext_noop("could not convert row type")); + if (map) + { + int j; + + mapped_fkattnum = palloc(sizeof(AttrNumber) * numfks); + for (j = 0; j < numfks; j++) + mapped_fkattnum[j] = map[fkattnum[j] - 1]; + } + else + mapped_fkattnum = fkattnum; + + /* down the rabbit hole */ + addFkRecurseReferencing(wqueue, fkconstraint, partRel, pkrel, + constrOid, numfks, pkattnum, + mapped_fkattnum, + pfeqoperators, ppeqoperators, ffeqoperators, + old_check_ok, true); + + /* Done -- clean up (but keep the lock) */ + heap_close(partRel, NoLock); + if (map) + { + pfree(mapped_fkattnum); + pfree(map); + } + } + } + /* XXX do we need anything for foreign tables? */ +} + +/* * ALTER TABLE ALTER CONSTRAINT * * Update the attributes of a constraint. @@ -8807,7 +9058,7 @@ createForeignKeyActionTriggers(Relation rel, Oid refRelOid, Constraint *fkconstr * Create the referencing-side "check" triggers that implement a foreign * key. */ -static void +void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, Constraint *fkconstraint, Oid constraintOid, Oid indexOid) @@ -8819,37 +9070,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, } /* - * Create the triggers that implement an FK constraint. - * - * NB: if you change any trigger properties here, see also - * ATExecAlterConstraint. - */ -void -createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, - Oid constraintOid, Oid indexOid, bool create_action) -{ - /* - * For the referenced side, create action triggers, if requested. (If the - * referencing side is partitioned, there is still only one trigger, which - * runs on the referenced side and points to the top of the referencing - * hierarchy.) - */ - if (create_action) - createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid, - indexOid); - - /* - * For the referencing side, create the check triggers. We only need - * these on the partitions. - */ - if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid, - fkconstraint, constraintOid, indexOid); - - CommandCounterIncrement(); -} - -/* * ALTER TABLE DROP CONSTRAINT * * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism. diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 240e85e391..621eacb2b3 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -1001,9 +1001,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, { /* * Internally-generated trigger for a constraint, so make it an - * internal dependency of the constraint. We can skip depending on - * the relation(s), as there'll be an indirect dependency via the - * constraint. + * internal dependency of the constraint. */ referenced.classId = ConstraintRelationId; referenced.objectId = constraintOid; diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 049b20449a..16a59788a4 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -397,18 +397,22 @@ RI_FKey_check(TriggerData *trigdata) char paramname[16]; const char *querysep; Oid queryoids[RI_MAX_NUMKEYS]; + const char *pk_only; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * corresponding FK attributes. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (i = 0; i < riinfo->nkeys; i++) { @@ -532,19 +536,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, char attname[MAX_QUOTED_NAME_LEN]; char paramname[16]; const char *querysep; + const char *pk_only; Oid queryoids[RI_MAX_NUMKEYS]; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * PK attributes themselves. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (i = 0; i < riinfo->nkeys; i++) { @@ -1858,6 +1866,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) RangeTblEntry *fkrte; const char *sep; const char *fk_only; + const char *pk_only; int i; int save_nestlevel; char workmembuf[32]; @@ -1917,7 +1926,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) /*---------- * The query string built is: * SELECT fk.keycols FROM [ONLY] relname fk - * LEFT OUTER JOIN ONLY pkrelname pk + * LEFT OUTER JOIN [ONLY] pkrelname pk * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH SIMPLE: @@ -1944,9 +1953,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) quoteRelationName(fkrelname, fk_rel); fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY "; + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; appendStringInfo(&querybuf, - " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON", - fk_only, fkrelname, pkrelname); + " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON", + fk_only, fkrelname, pk_only, pkrelname); strcpy(pkattname, "pk."); strcpy(fkattname, "fk."); @@ -2314,6 +2325,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk) /* Find or create a hashtable entry for the constraint */ riinfo = ri_LoadConstraintInfo(constraintOid); +#if 0 /* Do some easy cross-checks against the trigger call data */ if (rel_is_pk) { @@ -2322,6 +2334,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk) elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"", trigger->tgname, RelationGetRelationName(trig_rel)); } +#endif return riinfo; } diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 138de84e83..260e2d8e3a 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -75,9 +75,14 @@ extern void find_composite_type_dependencies(Oid typeOid, extern void check_of_type(HeapTuple typetuple); -extern void createForeignKeyTriggers(Relation rel, Oid refRelOid, - Constraint *fkconstraint, Oid constraintOid, - Oid indexOid, bool create_action); +extern void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); +extern ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid parentConstraint, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok); extern void register_on_commit_action(Oid relid, OnCommitAction action); extern void remove_on_commit_action(Oid relid); diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out index 52164e89d2..a2c2bf1360 100644 --- a/src/test/regress/expected/foreign_key.out +++ b/src/test/regress/expected/foreign_key.out @@ -1431,19 +1431,6 @@ drop table pktable2, fktable2; -- -- Foreign keys and partitioned tables -- --- partitioned table in the referenced side are not allowed -CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b)) - PARTITION BY RANGE (a, b); --- verify with create table first ... -CREATE TABLE fk_notpartitioned_fk (a int, b int, - FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk); -ERROR: cannot reference partitioned table "fk_partitioned_pk" --- and then with alter table. -CREATE TABLE fk_notpartitioned_fk_2 (a int, b int); -ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b) - REFERENCES fk_partitioned_pk; -ERROR: cannot reference partitioned table "fk_partitioned_pk" -DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2; -- Creation of a partitioned hierarchy with irregular definitions CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int, PRIMARY KEY (a, b)); @@ -1479,10 +1466,10 @@ DETAIL: This feature is not yet supported on partitioned tables. -- these inserts, targetting both the partition directly as well as the -- partitioned table, should all fail INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501); -ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey" +ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_1_a_fkey" DETAIL: Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk". INSERT INTO fk_partitioned_fk_1 (a,b) VALUES (500, 501); -ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey" +ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_1_a_fkey" DETAIL: Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk". INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501); ERROR: insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey" @@ -1609,7 +1596,7 @@ INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503); -- this fails, because the defaults for the referencing table are not present -- in the referenced table: UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502; -ERROR: insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey" +ERROR: insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_3_a_fkey" DETAIL: Key (a, b)=(2501, 142857) is not present in table "fk_notpartitioned_pk". -- but inserting the row we can make it work: INSERT INTO fk_notpartitioned_pk VALUES (2501, 142857); @@ -1781,3 +1768,114 @@ INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601); ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1600); -- leave these tables around intentionally +-- Test a partitioned table as referenced table. +-- Verify basic functionality with a regular partition creation and a partition +-- with a different column layout, as well as partitions +-- added (created and attached) after creating the foreign key. +create schema regress_fk; +set search_path to regress_fk; +create table pk (a int primary key) partition by range (a); +create table pk1 partition of pk for values from (0) to (1000); +create table pk2 (b int, a int); +alter table pk2 drop column b; +alter table pk2 alter a set not null; +alter table pk attach partition pk2 for values from (1000) to (2000); +create table fk (a int) partition by range (a); +create table fk1 partition of fk for values from (0) to (750); +alter table fk add foreign key (a) references pk; +create table fk2 (b int, a int) ; +alter table fk2 drop column b; +alter table fk attach partition fk2 for values from (750) to (3500); +create table pk3 partition of pk for values from (2000) to (3000); +create table pk4 (like pk); +alter table pk attach partition pk4 for values from (3000) to (4000); +create table pk5 (like pk) partition by range (a); +create table pk51 partition of pk5 for values from (4000) to (4500); +create table pk52 partition of pk5 for values from (4500) to (5000); +alter table pk attach partition pk5 for values from (4000) to (5000); +create table fk3 partition of fk for values from (3500) to (5000); +-- these should fail: referenced value not present +insert into fk values (1); +ERROR: insert or update on table "fk1" violates foreign key constraint "fk1_a_fkey" +DETAIL: Key (a)=(1) is not present in table "pk". +insert into fk values (1000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(1000) is not present in table "pk". +insert into fk values (2000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(2000) is not present in table "pk". +insert into fk values (3000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(3000) is not present in table "pk". +insert into fk values (4000); +ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(4000) is not present in table "pk". +insert into fk values (4500); +ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(4500) is not present in table "pk". +-- insert into the referenced table, now they should work +insert into pk values (1), (1000), (2000), (3000), (4000), (4500); +insert into fk values (1), (1000), (2000), (3000), (4000), (4500); +-- should fail: referencing value present +delete from pk where a = 1; +ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk" +DETAIL: Key (a)=(1) is still referenced from table "fk". +delete from pk where a = 1000; +ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk" +DETAIL: Key (a)=(1000) is still referenced from table "fk". +delete from pk where a = 2000; +ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk" +DETAIL: Key (a)=(2000) is still referenced from table "fk". +delete from pk where a = 3000; +ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk" +DETAIL: Key (a)=(3000) is still referenced from table "fk". +delete from pk where a = 4000; +ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk" +DETAIL: Key (a)=(4000) is still referenced from table "fk". +delete from pk where a = 4500; +ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk" +DETAIL: Key (a)=(4500) is still referenced from table "fk". +update pk set a = 2 where a = 1; +ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk" +DETAIL: Key (a)=(1) is still referenced from table "fk". +update pk set a = 1002 where a = 1000; +ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk" +DETAIL: Key (a)=(1000) is still referenced from table "fk". +update pk set a = 2002 where a = 2000; +ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk" +DETAIL: Key (a)=(2000) is still referenced from table "fk". +update pk set a = 3002 where a = 3000; +ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk" +DETAIL: Key (a)=(3000) is still referenced from table "fk". +update pk set a = 4002 where a = 4000; +ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk" +DETAIL: Key (a)=(4000) is still referenced from table "fk". +update pk set a = 4502 where a = 4500; +ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk" +DETAIL: Key (a)=(4500) is still referenced from table "fk". +-- now they should work +delete from fk; +update pk set a = 2 where a = 1; +delete from pk where a = 2; +update pk set a = 1002 where a = 1000; +delete from pk where a = 1002; +update pk set a = 2002 where a = 2000; +delete from pk where a = 2002; +update pk set a = 3002 where a = 3000; +delete from pk where a = 3002; +update pk set a = 4002 where a = 4000; +delete from pk where a = 4002; +update pk set a = 4502 where a = 4500; +delete from pk where a = 4502; +-- have a partitioned table that references a partitioned table +create table bfk (a int) partition by range (a); +create table bfk1 partition of bfk for values from (0) to (100); +create table bfk2 partition of bfk for values from (100) to (200); +insert into bfk select * from generate_series(90, 110); +-- Creating FK with existing data must verify referenced data exists +alter table bfk add foreign key (a) references pk; +ERROR: insert or update on table "bfk1" violates foreign key constraint "bfk1_a_fkey" +DETAIL: Key (a)=(90) is not present in table "pk". +insert into pk select * from generate_series(90, 110); +alter table bfk add foreign key (a) references pk; +-- test ON UPDATE, ON CASCADE diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql index f387004855..c1285aac60 100644 --- a/src/test/regress/sql/foreign_key.sql +++ b/src/test/regress/sql/foreign_key.sql @@ -1071,18 +1071,6 @@ drop table pktable2, fktable2; -- Foreign keys and partitioned tables -- --- partitioned table in the referenced side are not allowed -CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b)) - PARTITION BY RANGE (a, b); --- verify with create table first ... -CREATE TABLE fk_notpartitioned_fk (a int, b int, - FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk); --- and then with alter table. -CREATE TABLE fk_notpartitioned_fk_2 (a int, b int); -ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b) - REFERENCES fk_partitioned_pk; -DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2; - -- Creation of a partitioned hierarchy with irregular definitions CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int, PRIMARY KEY (a, b)); @@ -1289,3 +1277,86 @@ ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1600); -- leave these tables around intentionally + +-- Test a partitioned table as referenced table. +-- Verify basic functionality with a regular partition creation and a partition +-- with a different column layout, as well as partitions +-- added (created and attached) after creating the foreign key. +create schema regress_fk; +set search_path to regress_fk; + +create table pk (a int primary key) partition by range (a); +create table pk1 partition of pk for values from (0) to (1000); +create table pk2 (b int, a int); +alter table pk2 drop column b; +alter table pk2 alter a set not null; +alter table pk attach partition pk2 for values from (1000) to (2000); + +create table fk (a int) partition by range (a); +create table fk1 partition of fk for values from (0) to (750); +alter table fk add foreign key (a) references pk; +create table fk2 (b int, a int) ; +alter table fk2 drop column b; +alter table fk attach partition fk2 for values from (750) to (3500); + +create table pk3 partition of pk for values from (2000) to (3000); +create table pk4 (like pk); +alter table pk attach partition pk4 for values from (3000) to (4000); + +create table pk5 (like pk) partition by range (a); +create table pk51 partition of pk5 for values from (4000) to (4500); +create table pk52 partition of pk5 for values from (4500) to (5000); +alter table pk attach partition pk5 for values from (4000) to (5000); + +create table fk3 partition of fk for values from (3500) to (5000); + +-- these should fail: referenced value not present +insert into fk values (1); +insert into fk values (1000); +insert into fk values (2000); +insert into fk values (3000); +insert into fk values (4000); +insert into fk values (4500); +-- insert into the referenced table, now they should work +insert into pk values (1), (1000), (2000), (3000), (4000), (4500); +insert into fk values (1), (1000), (2000), (3000), (4000), (4500); + +-- should fail: referencing value present +delete from pk where a = 1; +delete from pk where a = 1000; +delete from pk where a = 2000; +delete from pk where a = 3000; +delete from pk where a = 4000; +delete from pk where a = 4500; +update pk set a = 2 where a = 1; +update pk set a = 1002 where a = 1000; +update pk set a = 2002 where a = 2000; +update pk set a = 3002 where a = 3000; +update pk set a = 4002 where a = 4000; +update pk set a = 4502 where a = 4500; +-- now they should work +delete from fk; +update pk set a = 2 where a = 1; +delete from pk where a = 2; +update pk set a = 1002 where a = 1000; +delete from pk where a = 1002; +update pk set a = 2002 where a = 2000; +delete from pk where a = 2002; +update pk set a = 3002 where a = 3000; +delete from pk where a = 3002; +update pk set a = 4002 where a = 4000; +delete from pk where a = 4002; +update pk set a = 4502 where a = 4500; +delete from pk where a = 4502; + +-- have a partitioned table that references a partitioned table +create table bfk (a int) partition by range (a); +create table bfk1 partition of bfk for values from (0) to (100); +create table bfk2 partition of bfk for values from (100) to (200); +insert into bfk select * from generate_series(90, 110); +-- Creating FK with existing data must verify referenced data exists +alter table bfk add foreign key (a) references pk; +insert into pk select * from generate_series(90, 110); +alter table bfk add foreign key (a) references pk; + +-- test ON UPDATE, ON CASCADE