I've implemented most of ADD/DROP INHERITS but it's my first significant piece
of code at this level. I would appreciate any feedback about it. In particular
I'm worried I may be on the wrong track about how some low level operations
work like memory management, syscache lookups, heap tuple creation etc. Also,
I'm not at all clear what kind of locks are really necessary for this
operation. I may be taking excessively strong or weak locks or have deadlock
risks.

The main thing remaining to be done is implementing default column
expressions. Those would require an Alter Table "Pass 3" operation I believe.
Also I haven't looked at table constraints at all yet, I'm not clear what's
supposed to happen there.

I made some decisions on some semantic issues that I believe are correct but
could stand some double checking. Specifically If the parent has oids then the
child must have oids and if a column in the parent is NOT NULL then the column
in the child must be NOT NULL as well.

I can send the actual patch to psql-patches, it includes some other changes to
refactor StoreCatalogInheritance and add the syntax to gram.y. But it's still
not quite finished because of default values.




static void
ATExecAddInherits(Relation rel, RangeVar *parent)
{
        Relation relation, catalogRelation;
        SysScanDesc scan;
        ScanKeyData key;
        HeapTuple inheritsTuple;
        int4 inhseqno = 0;
        ListCell   *child;
        List       *children;

        relation = heap_openrv(parent, AccessShareLock); /* XXX is this enough 
locking? */
        if (relation->rd_rel->relkind != RELKIND_RELATION)
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                 errmsg("inherited relation \"%s\" is not a 
table",
                                                parent->relname)));


        /* Permanent rels cannot inherit from temporary ones */
        if (!rel->rd_istemp && isTempNamespace(RelationGetNamespace(relation)))
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                 errmsg("cannot inherit from temporary relation 
\"%s\"",
                                                parent->relname)));
        
        if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
                                           RelationGetRelationName(relation));

        /* If parent has OIDs then all children must have OIDs */
        if (relation->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                 errmsg("table \"%s\" without OIDs cannot 
inherit from table \"%s\" with OIDs",
                                                RelationGetRelationName(rel), 
parent->relname)));

        /*
         * Reject duplications in the list of parents. -- this is the same 
check as
         * when creating a table, but maybe we should check for the parent 
anywhere
         * higher in the inheritance structure?
         */
        catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
        ScanKeyInit(&key,
                                Anum_pg_inherits_inhrelid,
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(RelationGetRelid(rel)));
        scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, 
true, SnapshotNow, 1, &key);
        while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
        {
                Form_pg_inherits inh = (Form_pg_inherits) 
GETSTRUCT(inheritsTuple);
                if (inh->inhparent == RelationGetRelid(relation))
                        ereport(ERROR,
                                        (errcode(ERRCODE_DUPLICATE_TABLE),
                                         errmsg("inherited relation \"%s\" 
duplicated",
                                                        parent->relname)));
                if (inh->inhseqno > inhseqno)
                        inhseqno = inh->inhseqno;
        }
        systable_endscan(scan);
        heap_close(catalogRelation, RowExclusiveLock);

        /* Get children because we have to manually recurse and also because we
         * have to check for recursive inheritance graphs */

        /* this routine is actually in the planner */
        children = find_all_inheritors(RelationGetRelid(rel));

        if (list_member_oid(children, RelationGetRelid(relation)))
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_TABLE),
                                 errmsg("Circular inheritance structure 
found")));

        foreach(child, children)
                {
                        Oid                     childrelid = lfirst_oid(child);
                        Relation        childrel;

                        childrel = relation_open(childrelid, 
AccessExclusiveLock);
                        MergeAttributesIntoExisting(childrel, relation);
                        relation_close(childrel, NoLock);
                }
        
        catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
        StoreCatalogInheritance1(RelationGetRelid(rel), 
RelationGetRelid(relation), inhseqno+1, catalogRelation);
        heap_close(catalogRelation, RowExclusiveLock);
        
        heap_close(relation, AccessShareLock);
}


static void
MergeAttributesIntoExisting(Relation rel, Relation relation) 
{
        Relation        attrdesc;
        AttrNumber      parent_attno, child_attno;
        TupleDesc       tupleDesc;
        TupleConstr *constr;
        HeapTuple       tuple;

        child_attno = RelationGetNumberOfAttributes(rel);

        tupleDesc = RelationGetDescr(relation);
        constr = tupleDesc->constr;

        for (parent_attno = 1; parent_attno <= tupleDesc->natts;
                 parent_attno++)
        {
                Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 
1];
                char       *attributeName = NameStr(attribute->attname);

                /* Ignore dropped columns in the parent. */
                if (attribute->attisdropped)
                        continue;

                /* Does it conflict with an existing column? */
                attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);

                tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), 
attributeName);
                if (HeapTupleIsValid(tuple)) {
                        /*
                         * Yes, try to merge the two column definitions. They 
must
                         * have the same type and typmod.
                         */
                        Form_pg_attribute childatt = (Form_pg_attribute) 
GETSTRUCT(tuple);
                        ereport(NOTICE,
                                        (errmsg("merging column \"%s\" with 
inherited definition",
                                                        attributeName)));
                        if (attribute->atttypid  != childatt->atttypid ||
                                attribute->atttypmod != childatt->atttypmod ||
                                (attribute->attnotnull && 
!childatt->attnotnull))
                                ereport(ERROR,
                                                
(errcode(ERRCODE_DATATYPE_MISMATCH),
                                                 errmsg("child table \"%s\" has 
different type for column \"%s\"",
                                                                
RelationGetRelationName(rel), NameStr(attribute->attname))));

                        childatt->attinhcount++;
                        simple_heap_update(attrdesc, &tuple->t_self, tuple);
                        CatalogUpdateIndexes(attrdesc, tuple); /* XXX strength 
reduce openindexes to outside loop? */
                        heap_freetuple(tuple);
                        
                        /* XXX defaults */

                } else {
                        /*
                         * No, create a new inherited column
                         */
                        
                        FormData_pg_attribute attributeD;
                        HeapTuple attributeTuple = 
heap_addheader(Natts_pg_attribute,
                                                                                
                          false,
                                                                                
                          ATTRIBUTE_TUPLE_SIZE,
                                                                                
                          (void *) &attributeD);
                        Form_pg_attribute childatt = (Form_pg_attribute) 
GETSTRUCT(attributeTuple);

                        if (attribute->attnotnull)
                                ereport(ERROR,
                                                
(errcode(ERRCODE_DATATYPE_MISMATCH),
                                                 errmsg("Cannot add new 
inherited NOT NULL column \"%s\"",
                                                                
NameStr(attribute->attname))));

                        childatt->attrelid = RelationGetRelid(rel);
                        namecpy(&childatt->attname, &attribute->attname);
                        childatt->atttypid = attribute->atttypid;
                        childatt->attstattarget = -1;
                        childatt->attlen = attribute->attlen;
                        childatt->attcacheoff = -1;
                        childatt->atttypmod = attribute->atttypmod;
                        childatt->attnum = ++child_attno;
                        childatt->attbyval = attribute->attbyval;
                        childatt->attndims = attribute->attndims;
                        childatt->attstorage = attribute->attstorage;
                        childatt->attalign = attribute->attalign;
                        childatt->attnotnull = false;
                        childatt->atthasdef = false; /* XXX */
                        childatt->attisdropped = false;
                        childatt->attislocal = false;
                        childatt->attinhcount = attribute->attinhcount+1;

                        simple_heap_insert(attrdesc, attributeTuple);
                        CatalogUpdateIndexes(attrdesc, attributeTuple);
                        heap_freetuple(attributeTuple);

                        /* XXX Defaults */

                }
                heap_close(attrdesc, RowExclusiveLock);
        }
        
}

static void
ATExecDropInherits(Relation rel, RangeVar *parent)
{


        Relation        catalogRelation;
        SysScanDesc scan;
        ScanKeyData key;
        HeapTuple       inheritsTuple, attributeTuple;
        Oid                     inhparent;
        Oid                     dropparent;
        int             found = 0;
        
        /* Get the OID of parent -- if no schema is specified use the regular
         * search path and only drop the one table that's found. We could try 
to be
         * clever and look at each parent and see if it matches but that would 
be
         * inconsistent with other operations I think. */
        
        Assert(rel);
        Assert(parent);

        dropparent = RangeVarGetRelid(parent, false);

        /* Search through the direct parents of rel looking for dropparent oid 
*/

        catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
        ScanKeyInit(&key,
                                Anum_pg_inherits_inhrelid,
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(RelationGetRelid(rel)));
        scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, 
true, SnapshotNow, 1, &key);
        while (!found && HeapTupleIsValid(inheritsTuple = 
systable_getnext(scan)))
        {
                inhparent = ((Form_pg_inherits) 
GETSTRUCT(inheritsTuple))->inhparent;
                if (inhparent == dropparent) {
                        simple_heap_delete(catalogRelation, 
&inheritsTuple->t_self);
                        found = 1;
                }
        }
        systable_endscan(scan);
        heap_close(catalogRelation, RowExclusiveLock);


        if (!found) {
                /* would it be better to look up the actual schema of 
dropparent and
                 * make the error message explicitly name the qualified name 
it's
                 * trying to drop ?*/
                if (parent->schemaname)
                        ereport(ERROR,
                                        (errcode(ERRCODE_UNDEFINED_TABLE),
                                         errmsg("relation \"%s.%s\" is not a 
parent of relation \"%s\"",
                                                        parent->schemaname, 
parent->relname, RelationGetRelationName(rel))));
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_UNDEFINED_TABLE),
                                         errmsg("relation \"%s\" is not a 
parent of relation \"%s\"",
                                                        parent->relname, 
RelationGetRelationName(rel))));
        }
        
        /* Search through columns looking for matching columns from parent 
table */

        catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);
        ScanKeyInit(&key,
                                Anum_pg_attribute_attrelid,
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(RelationGetRelid(rel)));
        scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId, 
true, SnapshotNow, 1, &key);
        while (HeapTupleIsValid(attributeTuple = systable_getnext(scan))) {
                Form_pg_attribute att = 
((Form_pg_attribute)GETSTRUCT(attributeTuple));
                /* Not an inherited column at all
                 * (do NOT use islocal for this test--it can be true for 
inherited columns)
                 */
                if (att->attinhcount == 0) 
                        continue; 
                if (att->attisdropped) /* XXX Is this right? */
                        continue;
                if (SearchSysCacheExistsAttName(dropparent, 
NameStr(att->attname))) {
                        /* Decrement inhcount and possibly set islocal to 1 */
                        HeapTuple copyTuple = heap_copytuple(attributeTuple);
                        Form_pg_attribute copy_att = 
((Form_pg_attribute)GETSTRUCT(copyTuple));

                        copy_att->attinhcount--;
                        if (copy_att->attinhcount == 0)
                                copy_att->attislocal = 1;

                        simple_heap_update(catalogRelation, &copyTuple->t_self, 
copyTuple);
                        /* XXX "Avoid using it for multiple tuples, since 
opening the
                         * indexes and building the index info structures is 
moderately
                         * expensive." Perhaps this can be moved outside the 
loop or else
                         * at least the CatalogOpenIndexes/CatalogCloseIndexes 
moved
                         * outside the loop but when I try that it seg 
faults?!*/
                        CatalogUpdateIndexes(catalogRelation, copyTuple);
                        heap_freetuple(copyTuple);
                }
        }
        systable_endscan(scan);
        heap_close(catalogRelation, RowExclusiveLock);
}




-- 
greg


---------------------------(end of broadcast)---------------------------
TIP 6: explain analyze is your friend

Reply via email to