On Tue, 2007-05-06 at 14:50 +0530, NikhilS wrote: > PFA, a patch which provides the "CREATE .. INCLUDING INDEXES" > functionality. This patch uses the same functionality introduced by > the earlier patches in this thread albeit for the "INCLUDING INDEXES" > case.
Attached is a revised version of this patch. Sorry for taking so long to make progress on this: my new job been keeping my busy, and I've recently been ill. This version updates the patch to CVS HEAD and has various fixes and refactoring, including proper docs. I refactored get_opclass_name() into lsyscache.c, but then realized that this means that lsyscache.c will depend on commands/indexcmds.c (for GetDefaultOpClass()), which is arguably improper, so I'm tempted to revert and just duplicate the syscache lookups in both ruleutils.c and parse_utilcmd.c Nikhil: why are both "options" and "inhreloptions" necessary in IndexStmt? Won't at least one be NULL? BTW, I notice that include/defrem.h contains declarations for several different, marginally-related .c files (indexcmds.c, functioncmds.c, operatorcmds.c, aggregatecmds.c, opclasscmds.c, define.c). I'm inclined to separate these declarations into separate header files; any objections to doing that? -Neil
Index: doc/src/sgml/ref/create_table.sgml =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/doc/src/sgml/ref/create_table.sgml,v retrieving revision 1.108 diff -p -c -r1.108 create_table.sgml *** doc/src/sgml/ref/create_table.sgml 3 Jun 2007 17:06:03 -0000 1.108 --- doc/src/sgml/ref/create_table.sgml 9 Jul 2007 04:34:40 -0000 *************** PostgreSQL documentation *** 23,29 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } ] TABLE <replaceable class="PARAMETER">table_name</replaceable> ( [ { <replaceable class="PARAMETER">column_name</replaceable> <replaceable class="PARAMETER">data_type</replaceable> [ DEFAULT <replaceable>default_expr</> ] [ <replaceable class="PARAMETER">column_constraint</replaceable> [ ... ] ] | <replaceable>table_constraint</replaceable> ! | LIKE <replaceable>parent_table</replaceable> [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS } ] ... } [, ... ] ] ) [ INHERITS ( <replaceable>parent_table</replaceable> [, ... ] ) ] --- 23,29 ---- CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } ] TABLE <replaceable class="PARAMETER">table_name</replaceable> ( [ { <replaceable class="PARAMETER">column_name</replaceable> <replaceable class="PARAMETER">data_type</replaceable> [ DEFAULT <replaceable>default_expr</> ] [ <replaceable class="PARAMETER">column_constraint</replaceable> [ ... ] ] | <replaceable>table_constraint</replaceable> ! | LIKE <replaceable>parent_table</replaceable> [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS | INDEXES } ] ... } [, ... ] ] ) [ INHERITS ( <replaceable>parent_table</replaceable> [, ... ] ) ] *************** and <replaceable class="PARAMETER">table *** 237,243 **** </varlistentry> <varlistentry> ! <term><literal>LIKE <replaceable>parent_table</replaceable> [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS } ]</literal></term> <listitem> <para> The <literal>LIKE</literal> clause specifies a table from which --- 237,243 ---- </varlistentry> <varlistentry> ! <term><literal>LIKE <replaceable>parent_table</replaceable> [ { INCLUDING | EXCLUDING } { DEFAULTS | CONSTRAINTS | INDEXES } ]</literal></term> <listitem> <para> The <literal>LIKE</literal> clause specifies a table from which *************** and <replaceable class="PARAMETER">table *** 266,275 **** requested, all check constraints are copied. </para> <para> Note also that unlike <literal>INHERITS</literal>, copied columns and constraints are not merged with similarly named columns and constraints. If the same name is specified explicitly or in another ! <literal>LIKE</literal> clause an error is signalled. </para> </listitem> </varlistentry> --- 266,280 ---- requested, all check constraints are copied. </para> <para> + Any indexes on the original table will not be created on the new + table, unless the <literal>INCLUDING INDEXES</literal> clause is + specified. + </para> + <para> Note also that unlike <literal>INHERITS</literal>, copied columns and constraints are not merged with similarly named columns and constraints. If the same name is specified explicitly or in another ! <literal>LIKE</literal> clause, an error is signalled. </para> </listitem> </varlistentry> Index: src/backend/bootstrap/bootparse.y =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/bootstrap/bootparse.y,v retrieving revision 1.88 diff -p -c -r1.88 bootparse.y *** src/backend/bootstrap/bootparse.y 13 Mar 2007 00:33:39 -0000 1.88 --- src/backend/bootstrap/bootparse.y 8 Jul 2007 00:49:29 -0000 *************** Boot_DeclareIndexStmt: *** 252,258 **** LexIDStr($8), NULL, $10, ! NULL, NIL, false, false, false, false, false, true, false, false); do_end(); --- 252,258 ---- LexIDStr($8), NULL, $10, ! NULL, NIL, NULL, false, false, false, false, false, true, false, false); do_end(); *************** Boot_DeclareUniqueIndexStmt: *** 270,276 **** LexIDStr($9), NULL, $11, ! NULL, NIL, true, false, false, false, false, true, false, false); do_end(); --- 270,276 ---- LexIDStr($9), NULL, $11, ! NULL, NIL, NULL, true, false, false, false, false, true, false, false); do_end(); Index: src/backend/commands/indexcmds.c =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/commands/indexcmds.c,v retrieving revision 1.160 diff -p -c -r1.160 indexcmds.c *** src/backend/commands/indexcmds.c 23 Jun 2007 22:12:50 -0000 1.160 --- src/backend/commands/indexcmds.c 10 Jul 2007 05:46:18 -0000 *************** static bool relationHasPrimaryKey(Relati *** 79,84 **** --- 79,85 ---- * to index on. * 'predicate': the partial-index condition, or NULL if none. * 'options': reloptions from WITH (in list-of-DefElem form). + * 'inhreloptions': * 'unique': make the index enforce uniqueness. * 'primary': mark the index as a primary key in the catalogs. * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint, *************** DefineIndex(RangeVar *heapRelation, *** 100,105 **** --- 101,107 ---- List *attributeList, Expr *predicate, List *options, + char *inhreloptions, bool unique, bool primary, bool isconstraint, *************** DefineIndex(RangeVar *heapRelation, *** 392,400 **** } /* ! * Parse AM-specific options, convert to text array form, validate. */ ! reloptions = transformRelOptions((Datum) 0, options, false, false); (void) index_reloptions(amoptions, reloptions, true); --- 394,410 ---- } /* ! * Parse AM-specific options, convert to text array form, ! * validate. The inh reloptions introduced due to using indexes ! * via the "CREATE LIKE INCLUDING INDEXES" statement also need to ! * be merged here */ ! if (inhreloptions) ! reloptions = unflatten_reloptions(inhreloptions); ! else ! reloptions = (Datum) 0; ! ! reloptions = transformRelOptions(reloptions, options, false, false); (void) index_reloptions(amoptions, reloptions, true); Index: src/backend/commands/tablecmds.c =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/commands/tablecmds.c,v retrieving revision 1.229 diff -p -c -r1.229 tablecmds.c *** src/backend/commands/tablecmds.c 3 Jul 2007 01:30:36 -0000 1.229 --- src/backend/commands/tablecmds.c 8 Jul 2007 00:49:29 -0000 *************** ATExecAddIndex(AlteredTableInfo *tab, Re *** 3794,3799 **** --- 3794,3800 ---- stmt->indexParams, /* parameters */ (Expr *) stmt->whereClause, stmt->options, + stmt->inhreloptions, stmt->unique, stmt->primary, stmt->isconstraint, Index: src/backend/nodes/copyfuncs.c =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/nodes/copyfuncs.c,v retrieving revision 1.379 diff -p -c -r1.379 copyfuncs.c *** src/backend/nodes/copyfuncs.c 11 Jun 2007 22:22:40 -0000 1.379 --- src/backend/nodes/copyfuncs.c 8 Jul 2007 00:49:29 -0000 *************** _copyIndexStmt(IndexStmt *from) *** 2192,2197 **** --- 2192,2198 ---- COPY_STRING_FIELD(tableSpace); COPY_NODE_FIELD(indexParams); COPY_NODE_FIELD(options); + COPY_STRING_FIELD(inhreloptions); COPY_NODE_FIELD(whereClause); COPY_SCALAR_FIELD(unique); COPY_SCALAR_FIELD(primary); Index: src/backend/nodes/equalfuncs.c =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/nodes/equalfuncs.c,v retrieving revision 1.310 diff -p -c -r1.310 equalfuncs.c *** src/backend/nodes/equalfuncs.c 11 Jun 2007 22:22:40 -0000 1.310 --- src/backend/nodes/equalfuncs.c 8 Jul 2007 01:33:06 -0000 *************** _equalIndexStmt(IndexStmt *a, IndexStmt *** 1044,1049 **** --- 1044,1050 ---- COMPARE_STRING_FIELD(tableSpace); COMPARE_NODE_FIELD(indexParams); COMPARE_NODE_FIELD(options); + COMPARE_STRING_FIELD(inhreloptions); COMPARE_NODE_FIELD(whereClause); COMPARE_SCALAR_FIELD(unique); COMPARE_SCALAR_FIELD(primary); Index: src/backend/nodes/outfuncs.c =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/nodes/outfuncs.c,v retrieving revision 1.311 diff -p -c -r1.311 outfuncs.c *** src/backend/nodes/outfuncs.c 11 Jun 2007 22:22:40 -0000 1.311 --- src/backend/nodes/outfuncs.c 8 Jul 2007 01:34:11 -0000 *************** _outIndexStmt(StringInfo str, IndexStmt *** 1541,1546 **** --- 1541,1547 ---- WRITE_STRING_FIELD(tableSpace); WRITE_NODE_FIELD(indexParams); WRITE_NODE_FIELD(options); + WRITE_STRING_FIELD(inhreloptions); WRITE_NODE_FIELD(whereClause); WRITE_BOOL_FIELD(unique); WRITE_BOOL_FIELD(primary); Index: src/backend/parser/parse_utilcmd.c =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/parser/parse_utilcmd.c,v retrieving revision 2.1 diff -p -c -r2.1 parse_utilcmd.c *** src/backend/parser/parse_utilcmd.c 23 Jun 2007 22:12:51 -0000 2.1 --- src/backend/parser/parse_utilcmd.c 10 Jul 2007 05:18:16 -0000 *************** *** 26,38 **** --- 26,41 ---- #include "postgres.h" + #include "access/genam.h" #include "access/heapam.h" #include "catalog/heap.h" #include "catalog/index.h" #include "catalog/namespace.h" + #include "catalog/pg_opclass.h" #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/tablecmds.h" + #include "commands/tablespace.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "optimizer/clauses.h" *************** *** 47,52 **** --- 50,56 ---- #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" + #include "utils/relcache.h" #include "utils/syscache.h" *************** static void transformTableConstraint(Par *** 93,100 **** --- 97,108 ---- Constraint *constraint); static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, InhRelation *inhrelation); + static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, + Relation parent_index, AttrNumber *attmap); static void transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt); + static IndexStmt *transformIndexConstraint(Constraint *constraint, + CreateStmtContext *cxt); static void transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt, bool skipValidation, *************** transformInhRelation(ParseState *pstate, *** 555,565 **** } } - if (including_indexes) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("LIKE INCLUDING INDEXES is not implemented"))); - /* * Insert the copied attributes into the cxt for the new table * definition. --- 563,568 ---- *************** transformInhRelation(ParseState *pstate, *** 657,662 **** --- 660,696 ---- } } + if (including_indexes && relation->rd_rel->relhasindex) + { + AttrNumber *attmap; + List *parent_indexes; + ListCell *l; + + attmap = varattnos_map_schema(tupleDesc, cxt->columns); + parent_indexes = RelationGetIndexList(relation); + + foreach(l, parent_indexes) + { + Oid parent_index_oid = lfirst_oid(l); + Relation parent_index; + IndexStmt *index_stmt; + + parent_index = index_open(parent_index_oid, AccessShareLock); + + /* + * Build a CREATE INDEX statement to recreate the parent_index. + */ + index_stmt = generateClonedIndexStmt(cxt, parent_index, + attmap); + + /* Add the new IndexStmt to the create context */ + cxt->ixconstraints = lappend(cxt->ixconstraints, index_stmt); + + /* Keep our lock on the index till xact commit */ + index_close(parent_index, NoLock); + } + } + /* * Close the parent rel, but keep our AccessShareLock on it until xact * commit. That will prevent someone else from deleting or ALTERing the *************** transformInhRelation(ParseState *pstate, *** 666,851 **** } /* ! * transformIndexConstraints ! * Handle UNIQUE and PRIMARY KEY constraints, which create indexes */ ! static void ! transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) { ! IndexStmt *index; ! List *indexlist = NIL; ! ListCell *listptr; ! ListCell *l; ! /* ! * Run through the constraints that need to generate an index. For PRIMARY ! * KEY, mark each column as NOT NULL and create an index. For UNIQUE, ! * create an index as for PRIMARY KEY, but do not insist on NOT NULL. ! */ ! foreach(listptr, cxt->ixconstraints) ! { ! Constraint *constraint = lfirst(listptr); ! ListCell *keys; ! IndexElem *iparam; ! Assert(IsA(constraint, Constraint)); ! Assert((constraint->contype == CONSTR_PRIMARY) ! || (constraint->contype == CONSTR_UNIQUE)); ! index = makeNode(IndexStmt); ! index->unique = true; ! index->primary = (constraint->contype == CONSTR_PRIMARY); ! if (index->primary) { ! if (cxt->pkey != NULL) ! ereport(ERROR, ! (errcode(ERRCODE_INVALID_TABLE_DEFINITION), ! errmsg("multiple primary keys for table \"%s\" are not allowed", ! cxt->relation->relname))); ! cxt->pkey = index; ! /* ! * In ALTER TABLE case, a primary index might already exist, but ! * DefineIndex will check for it. ! */ } - index->isconstraint = true; ! if (constraint->name != NULL) ! index->idxname = pstrdup(constraint->name); ! else ! index->idxname = NULL; /* DefineIndex will choose name */ ! index->relation = cxt->relation; ! index->accessMethod = DEFAULT_INDEX_TYPE; ! index->options = constraint->options; ! index->tableSpace = constraint->indexspace; ! index->indexParams = NIL; ! index->whereClause = NULL; ! index->concurrent = false; ! /* ! * Make sure referenced keys exist. If we are making a PRIMARY KEY ! * index, also make sure they are NOT NULL, if possible. (Although we ! * could leave it to DefineIndex to mark the columns NOT NULL, it's ! * more efficient to get it right the first time.) ! */ ! foreach(keys, constraint->keys) { ! char *key = strVal(lfirst(keys)); ! bool found = false; ! ColumnDef *column = NULL; ! ListCell *columns; ! foreach(columns, cxt->columns) ! { ! column = (ColumnDef *) lfirst(columns); ! Assert(IsA(column, ColumnDef)); ! if (strcmp(column->colname, key) == 0) ! { ! found = true; ! break; ! } ! } ! if (found) ! { ! /* found column in the new table; force it to be NOT NULL */ ! if (constraint->contype == CONSTR_PRIMARY) ! column->is_not_null = TRUE; ! } ! else if (SystemAttributeByName(key, cxt->hasoids) != NULL) ! { ! /* ! * column will be a system column in the new table, so accept ! * it. System columns can't ever be null, so no need to worry ! * about PRIMARY/NOT NULL constraint. ! */ ! found = true; ! } ! else if (cxt->inhRelations) ! { ! /* try inherited tables */ ! ListCell *inher; ! foreach(inher, cxt->inhRelations) ! { ! RangeVar *inh = (RangeVar *) lfirst(inher); ! Relation rel; ! int count; ! ! Assert(IsA(inh, RangeVar)); ! rel = heap_openrv(inh, AccessShareLock); ! if (rel->rd_rel->relkind != RELKIND_RELATION) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("inherited relation \"%s\" is not a table", ! inh->relname))); ! for (count = 0; count < rel->rd_att->natts; count++) ! { ! Form_pg_attribute inhattr = rel->rd_att->attrs[count]; ! char *inhname = NameStr(inhattr->attname); ! if (inhattr->attisdropped) ! continue; ! if (strcmp(key, inhname) == 0) ! { ! found = true; ! ! /* ! * We currently have no easy way to force an ! * inherited column to be NOT NULL at creation, if ! * its parent wasn't so already. We leave it to ! * DefineIndex to fix things up in this case. ! */ ! break; ! } ! } ! heap_close(rel, NoLock); ! if (found) ! break; ! } ! } ! /* ! * In the ALTER TABLE case, don't complain about index keys not ! * created in the command; they may well exist already. ! * DefineIndex will complain about them if not, and will also take ! * care of marking them NOT NULL. ! */ ! if (!found && !cxt->isalter) ! ereport(ERROR, ! (errcode(ERRCODE_UNDEFINED_COLUMN), ! errmsg("column \"%s\" named in key does not exist", ! key))); ! /* Check for PRIMARY KEY(foo, foo) */ ! foreach(columns, index->indexParams) ! { ! iparam = (IndexElem *) lfirst(columns); ! if (iparam->name && strcmp(key, iparam->name) == 0) { ! if (index->primary) ereport(ERROR, ! (errcode(ERRCODE_DUPLICATE_COLUMN), ! errmsg("column \"%s\" appears twice in primary key constraint", ! key))); ! else ! ereport(ERROR, ! (errcode(ERRCODE_DUPLICATE_COLUMN), ! errmsg("column \"%s\" appears twice in unique constraint", ! key))); } ! } ! /* OK, add it to the index definition */ ! iparam = makeNode(IndexElem); ! iparam->name = pstrdup(key); ! iparam->expr = NULL; ! iparam->opclass = NIL; ! iparam->ordering = SORTBY_DEFAULT; ! iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; ! index->indexParams = lappend(index->indexParams, iparam); } indexlist = lappend(indexlist, index); --- 700,945 ---- } /* ! * Generate an IndexStmt entry using information from an already ! * existing index "source_idx". ! * ! * Note: Much of this functionality is cribbed from pg_get_indexdef. */ ! static IndexStmt * ! generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, ! AttrNumber *attmap) { ! HeapTuple ht_idx; ! HeapTuple ht_idxrel; ! HeapTuple ht_am; ! Form_pg_index idxrec; ! Form_pg_class idxrelrec; ! Form_pg_am amrec; ! List *indexprs = NIL; ! ListCell *indexpr_item; ! Oid indrelid; ! Oid source_relid; ! int keyno; ! Oid keycoltype; ! Datum indclassDatum; ! Datum indoptionDatum; ! bool isnull; ! oidvector *indclass; ! int2vector *indoption; ! IndexStmt *index; ! Datum reloptions; ! ! source_relid = RelationGetRelid(source_idx); ! ! /* Fetch pg_index tuple for source index */ ! ht_idx = SearchSysCache(INDEXRELID, ! ObjectIdGetDatum(source_relid), ! 0, 0, 0); ! if (!HeapTupleIsValid(ht_idx)) ! elog(ERROR, "cache lookup failed for index %u", source_relid); ! idxrec = (Form_pg_index) GETSTRUCT(ht_idx); ! ! Assert(source_relid == idxrec->indexrelid); ! indrelid = idxrec->indrelid; ! ! index = makeNode(IndexStmt); ! index->unique = idxrec->indisunique; ! index->concurrent = false; ! index->primary = idxrec->indisprimary; ! index->relation = cxt->relation; ! index->isconstraint = false; ! ! /* ! * We don't try to preserve the name of the source index; instead, just ! * let DefineIndex() choose a reasonable name. ! */ ! index->idxname = NULL; ! ! /* Must get indclass and indoption the hard way */ ! indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx, ! Anum_pg_index_indclass, &isnull); ! Assert(!isnull); ! indclass = (oidvector *) DatumGetPointer(indclassDatum); ! indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx, ! Anum_pg_index_indoption, &isnull); ! Assert(!isnull); ! indoption = (int2vector *) DatumGetPointer(indoptionDatum); ! ! /* Fetch pg_class tuple of source index */ ! ht_idxrel = SearchSysCache(RELOID, ! ObjectIdGetDatum(source_relid), ! 0, 0, 0); ! if (!HeapTupleIsValid(ht_idxrel)) ! elog(ERROR, "cache lookup failed for relation %u", source_relid); ! ! /* ! * Store the reloptions for later use by this new index ! */ ! reloptions = SysCacheGetAttr(RELOID, ht_idxrel, ! Anum_pg_class_reloptions, &isnull); ! if (!isnull) ! index->inhreloptions = flatten_reloptions(source_relid); ! ! idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel); ! ! /* Fetch pg_am tuple for the index's access method */ ! ht_am = SearchSysCache(AMOID, ! ObjectIdGetDatum(idxrelrec->relam), ! 0, 0, 0); ! if (!HeapTupleIsValid(ht_am)) ! elog(ERROR, "cache lookup failed for access method %u", ! idxrelrec->relam); ! amrec = (Form_pg_am) GETSTRUCT(ht_am); ! index->accessMethod = pstrdup(NameStr(amrec->amname)); ! ! /* ! * Get the index expressions, if any. ! */ ! if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs)) ! { ! Datum exprsDatum; ! bool isnull; ! char *exprsString; ! ! exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx, ! Anum_pg_index_indexprs, &isnull); ! exprsString = DatumGetCString(DirectFunctionCall1(textout, ! exprsDatum)); ! Assert(!isnull); ! indexprs = (List *) stringToNode(exprsString); ! } ! ! indexpr_item = list_head(indexprs); ! ! for (keyno = 0; keyno < idxrec->indnatts; keyno++) ! { ! IndexElem *iparam; ! AttrNumber attnum = idxrec->indkey.values[keyno]; ! int16 opt = indoption->values[keyno]; ! iparam = makeNode(IndexElem); ! if (attnum != 0) ! { ! /* Simple index column */ ! char *attname; ! attname = get_relid_attribute_name(indrelid, attnum); ! keycoltype = get_atttype(indrelid, attnum); ! iparam->name = attname; ! iparam->expr = NULL; ! } ! else { ! /* Expressional index */ ! Node *indexkey; ! if (indexpr_item == NULL) ! elog(ERROR, "too few entries in indexprs list"); ! indexkey = (Node *) lfirst(indexpr_item); ! change_varattnos_of_a_node(indexkey, attmap); ! iparam->name = NULL; ! iparam->expr = indexkey; ! ! indexpr_item = lnext(indexpr_item); ! keycoltype = exprType(indexkey); } ! /* Add the operator class name, if non-default */ ! iparam->opclass = get_opclass_name(indclass->values[keyno], ! keycoltype); ! iparam->ordering = SORTBY_DEFAULT; ! iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; ! /* Adjust options if necessary */ ! if (amrec->amcanorder) { ! /* If it supports sort ordering, report DESC and NULLS opts */ ! if (opt & INDOPTION_DESC) ! iparam->ordering = SORTBY_DESC; ! if (opt & INDOPTION_NULLS_FIRST) ! iparam->nulls_ordering = SORTBY_NULLS_FIRST; ! } ! index->indexParams = lappend(index->indexParams, iparam); ! } ! /* Use the same tablespace as the source index */ ! index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode); ! /* If it's a partial index, decompile and append the predicate */ ! if (!heap_attisnull(ht_idx, Anum_pg_index_indpred)) ! { ! Datum pred_datum; ! bool isnull; ! char *pred_str; ! /* Convert text string to node tree */ ! pred_datum = SysCacheGetAttr(INDEXRELID, ht_idx, ! Anum_pg_index_indpred, &isnull); ! Assert(!isnull); ! pred_str = DatumGetCString(DirectFunctionCall1(textout, ! pred_datum)); ! index->whereClause = (Node *) stringToNode(pred_str); ! change_varattnos_of_a_node(index->whereClause, attmap); ! } ! /* Clean up */ ! ReleaseSysCache(ht_idx); ! ReleaseSysCache(ht_idxrel); ! ReleaseSysCache(ht_am); ! ! return index; ! } ! ! /* ! * transformIndexConstraints ! * Handle UNIQUE and PRIMARY KEY constraints, which create indexes ! */ ! static void ! transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) ! { ! IndexStmt *index; ! List *indexlist = NIL; ! ListCell *lc; ! ! /* ! * Run through the constraints that need to generate an index. For PRIMARY ! * KEY, mark each column as NOT NULL and create an index. For UNIQUE, ! * create an index as for PRIMARY KEY, but do not insist on NOT NULL. ! * ! * If the table is being created using LIKE INCLUDING INDEXES, the ! * ixconstraints list will contain a mix of Constraint and IndexStmt ! * entries ! */ ! foreach(lc, cxt->ixconstraints) ! { ! Node *node = lfirst(lc); ! ! switch (nodeTag(node)) ! { ! case T_Constraint: ! index = transformIndexConstraint((Constraint *) node, cxt); ! break; ! ! case T_IndexStmt: ! index = (IndexStmt *) node; ! if (index->primary) { ! if (cxt->pkey != NULL) ereport(ERROR, ! (errcode(ERRCODE_INVALID_TABLE_DEFINITION), ! errmsg("multiple primary keys for table \"%s\" are not allowed", ! cxt->relation->relname))); ! cxt->pkey = index; } ! break; ! default: ! elog(ERROR, "unrecognized node type: %d", nodeTag(node)); ! index = NULL; /* keep the compiler quiet */ } indexlist = lappend(indexlist, index); *************** transformIndexConstraints(ParseState *ps *** 867,878 **** cxt->alist = list_make1(cxt->pkey); } ! foreach(l, indexlist) { bool keep = true; ListCell *k; ! index = lfirst(l); /* if it's pkey, it's already in cxt->alist */ if (index == cxt->pkey) --- 961,972 ---- cxt->alist = list_make1(cxt->pkey); } ! foreach(lc, indexlist) { bool keep = true; ListCell *k; ! index = lfirst(lc); /* if it's pkey, it's already in cxt->alist */ if (index == cxt->pkey) *************** transformIndexConstraints(ParseState *ps *** 902,907 **** --- 996,1170 ---- } } + static IndexStmt * + transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) + { + IndexStmt *index; + ListCell *keys; + IndexElem *iparam; + + Assert(constraint->contype == CONSTR_PRIMARY || + constraint->contype == CONSTR_UNIQUE); + + index = makeNode(IndexStmt); + index->unique = true; + index->primary = (constraint->contype == CONSTR_PRIMARY); + + if (index->primary) + { + if (cxt->pkey != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("multiple primary keys for table \"%s\" are not allowed", + cxt->relation->relname))); + cxt->pkey = index; + + /* + * In ALTER TABLE case, a primary index might already exist, but + * DefineIndex will check for it. + */ + } + index->isconstraint = true; + + if (constraint->name != NULL) + index->idxname = pstrdup(constraint->name); + else + index->idxname = NULL; /* DefineIndex will choose name */ + + index->relation = cxt->relation; + index->accessMethod = DEFAULT_INDEX_TYPE; + index->options = constraint->options; + index->tableSpace = constraint->indexspace; + index->indexParams = NIL; + index->whereClause = NULL; + index->concurrent = false; + + /* + * Make sure referenced keys exist. If we are making a PRIMARY KEY + * index, also make sure they are NOT NULL, if possible. (Although we + * could leave it to DefineIndex to mark the columns NOT NULL, it's + * more efficient to get it right the first time.) + */ + foreach(keys, constraint->keys) + { + char *key = strVal(lfirst(keys)); + bool found = false; + ColumnDef *column = NULL; + ListCell *columns; + + foreach(columns, cxt->columns) + { + column = (ColumnDef *) lfirst(columns); + Assert(IsA(column, ColumnDef)); + if (strcmp(column->colname, key) == 0) + { + found = true; + break; + } + } + if (found) + { + /* found column in the new table; force it to be NOT NULL */ + if (constraint->contype == CONSTR_PRIMARY) + column->is_not_null = TRUE; + } + else if (SystemAttributeByName(key, cxt->hasoids) != NULL) + { + /* + * column will be a system column in the new table, so accept + * it. System columns can't ever be null, so no need to worry + * about PRIMARY/NOT NULL constraint. + */ + found = true; + } + else if (cxt->inhRelations) + { + /* try inherited tables */ + ListCell *inher; + + foreach(inher, cxt->inhRelations) + { + RangeVar *inh = (RangeVar *) lfirst(inher); + Relation rel; + int count; + + Assert(IsA(inh, RangeVar)); + rel = heap_openrv(inh, AccessShareLock); + if (rel->rd_rel->relkind != RELKIND_RELATION) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("inherited relation \"%s\" is not a table", + inh->relname))); + for (count = 0; count < rel->rd_att->natts; count++) + { + Form_pg_attribute inhattr = rel->rd_att->attrs[count]; + char *inhname = NameStr(inhattr->attname); + + if (inhattr->attisdropped) + continue; + if (strcmp(key, inhname) == 0) + { + found = true; + + /* + * We currently have no easy way to force an + * inherited column to be NOT NULL at creation, if + * its parent wasn't so already. We leave it to + * DefineIndex to fix things up in this case. + */ + break; + } + } + heap_close(rel, NoLock); + if (found) + break; + } + } + + /* + * In the ALTER TABLE case, don't complain about index keys not + * created in the command; they may well exist already. + * DefineIndex will complain about them if not, and will also take + * care of marking them NOT NULL. + */ + if (!found && !cxt->isalter) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" named in key does not exist", + key))); + + /* Check for PRIMARY KEY(foo, foo) */ + foreach(columns, index->indexParams) + { + iparam = (IndexElem *) lfirst(columns); + if (iparam->name && strcmp(key, iparam->name) == 0) + { + if (index->primary) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_COLUMN), + errmsg("column \"%s\" appears twice in primary key constraint", + key))); + else + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_COLUMN), + errmsg("column \"%s\" appears twice in unique constraint", + key))); + } + } + + /* OK, add it to the index definition */ + iparam = makeNode(IndexElem); + iparam->name = pstrdup(key); + iparam->expr = NULL; + iparam->opclass = NIL; + iparam->ordering = SORTBY_DEFAULT; + iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; + index->indexParams = lappend(index->indexParams, iparam); + } + + return index; + } + /* * transformFKConstraints * handle FOREIGN KEY constraints Index: src/backend/tcop/utility.c =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/tcop/utility.c,v retrieving revision 1.283 diff -p -c -r1.283 utility.c *** src/backend/tcop/utility.c 3 Jul 2007 01:30:37 -0000 1.283 --- src/backend/tcop/utility.c 8 Jul 2007 00:49:30 -0000 *************** ProcessUtility(Node *parsetree, *** 886,891 **** --- 886,892 ---- stmt->indexParams, /* parameters */ (Expr *) stmt->whereClause, stmt->options, + stmt->inhreloptions, stmt->unique, stmt->primary, stmt->isconstraint, Index: src/backend/utils/adt/ruleutils.c =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/utils/adt/ruleutils.c,v retrieving revision 1.262 diff -p -c -r1.262 ruleutils.c *** src/backend/utils/adt/ruleutils.c 18 Jun 2007 21:40:58 -0000 1.262 --- src/backend/utils/adt/ruleutils.c 10 Jul 2007 05:50:13 -0000 *************** static void get_from_clause_alias(Alias *** 184,191 **** deparse_context *context); static void get_from_clause_coldeflist(List *names, List *types, List *typmods, deparse_context *context); ! static void get_opclass_name(Oid opclass, Oid actual_datatype, ! StringInfo buf); static Node *processIndirection(Node *node, deparse_context *context, bool printit); static void printSubscripts(ArrayRef *aref, deparse_context *context); --- 184,190 ---- deparse_context *context); static void get_from_clause_coldeflist(List *names, List *types, List *typmods, deparse_context *context); ! static void get_opclass(Oid opclass, Oid actual_datatype, StringInfo buf); static Node *processIndirection(Node *node, deparse_context *context, bool printit); static void printSubscripts(ArrayRef *aref, deparse_context *context); *************** static char *generate_relation_name(Oid *** 193,199 **** static char *generate_function_name(Oid funcid, int nargs, Oid *argtypes); static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2); static text *string_to_text(char *str); - static char *flatten_reloptions(Oid relid); #define only_marker(rte) ((rte)->inh ? "" : "ONLY ") --- 192,197 ---- *************** pg_get_indexdef_worker(Oid indexrelid, i *** 763,770 **** /* Add the operator class name */ if (!colno) ! get_opclass_name(indclass->values[keyno], keycoltype, ! &buf); /* Add options if relevant */ if (amrec->amcanorder) --- 761,767 ---- /* Add the operator class name */ if (!colno) ! get_opclass(indclass->values[keyno], keycoltype, &buf); /* Add options if relevant */ if (amrec->amcanorder) *************** get_from_clause_coldeflist(List *names, *** 4995,5001 **** } /* ! * get_opclass_name - fetch name of an index operator class * * The opclass name is appended (after a space) to buf. * --- 4992,4998 ---- } /* ! * get_opclass - fetch name of an index operator class * * The opclass name is appended (after a space) to buf. * *************** get_from_clause_coldeflist(List *names, *** 5004,5040 **** * InvalidOid for actual_datatype.) */ static void ! get_opclass_name(Oid opclass, Oid actual_datatype, ! StringInfo buf) { ! HeapTuple ht_opc; ! Form_pg_opclass opcrec; ! char *opcname; ! char *nspname; ! ! ht_opc = SearchSysCache(CLAOID, ! ObjectIdGetDatum(opclass), ! 0, 0, 0); ! if (!HeapTupleIsValid(ht_opc)) ! elog(ERROR, "cache lookup failed for opclass %u", opclass); ! opcrec = (Form_pg_opclass) GETSTRUCT(ht_opc); ! if (!OidIsValid(actual_datatype) || ! GetDefaultOpClass(actual_datatype, opcrec->opcmethod) != opclass) { /* Okay, we need the opclass name. Do we need to qualify it? */ ! opcname = NameStr(opcrec->opcname); if (OpclassIsVisible(opclass)) appendStringInfo(buf, " %s", quote_identifier(opcname)); else { ! nspname = get_namespace_name(opcrec->opcnamespace); appendStringInfo(buf, " %s.%s", quote_identifier(nspname), quote_identifier(opcname)); } } - ReleaseSysCache(ht_opc); } /* --- 5001,5024 ---- * InvalidOid for actual_datatype.) */ static void ! get_opclass(Oid opclass, Oid actual_datatype, StringInfo buf) { ! List *name_list = get_opclass_name(opclass, actual_datatype); ! if (name_list) { /* Okay, we need the opclass name. Do we need to qualify it? */ ! char *opcname = strVal(linitial(name_list)); if (OpclassIsVisible(opclass)) appendStringInfo(buf, " %s", quote_identifier(opcname)); else { ! char *nspname = strVal(lsecond(name_list)); appendStringInfo(buf, " %s.%s", quote_identifier(nspname), quote_identifier(opcname)); } } } /* *************** string_to_text(char *str) *** 5417,5423 **** /* * Generate a C string representing a relation's reloptions, or NULL if none. */ ! static char * flatten_reloptions(Oid relid) { char *result = NULL; --- 5401,5407 ---- /* * Generate a C string representing a relation's reloptions, or NULL if none. */ ! char * flatten_reloptions(Oid relid) { char *result = NULL; *************** flatten_reloptions(Oid relid) *** 5453,5455 **** --- 5437,5467 ---- return result; } + + /* + * Generate an Array Datum representing a relation's reloptions using + * a C string + */ + Datum + unflatten_reloptions(char *reloptstring) + { + Datum result = (Datum) 0; + + if (reloptstring) + { + Datum sep, relopts; + + /* + * We want to use text_to_array(reloptstring, ', ') --- but + * DirectFunctionCall2(text_to_array) does not work, because + * text_to_array() relies on fcinfo to be valid. So use + * OidFunctionCall2. + */ + sep = DirectFunctionCall1(textin, CStringGetDatum(", ")); + relopts = DirectFunctionCall1(textin, CStringGetDatum(reloptstring)); + + result = OidFunctionCall2(F_TEXT_TO_ARRAY, relopts, sep); + } + + return result; + } Index: src/backend/utils/cache/lsyscache.c =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/backend/utils/cache/lsyscache.c,v retrieving revision 1.152 diff -p -c -r1.152 lsyscache.c *** src/backend/utils/cache/lsyscache.c 11 May 2007 17:57:12 -0000 1.152 --- src/backend/utils/cache/lsyscache.c 10 Jul 2007 05:35:21 -0000 *************** *** 27,32 **** --- 27,33 ---- #include "catalog/pg_proc.h" #include "catalog/pg_statistic.h" #include "catalog/pg_type.h" + #include "commands/defrem.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "utils/array.h" *************** get_opclass_input_type(Oid opclass) *** 980,985 **** --- 981,1024 ---- return result; } + /* + * get_opclass + * + * Returns the qualified name of the specified index opclass, as + * a pair of string Values. + * + * If the opclass is the default for the given actual_datatype, then + * this returns NIL. (If you don't want this behavior, just pass + * InvalidOid for actual_datatype.) + */ + List * + get_opclass_name(Oid opclass, Oid actual_datatype) + { + HeapTuple ht_opc; + Form_pg_opclass opc_rec; + List *name_list = NIL; + + ht_opc = SearchSysCache(CLAOID, + ObjectIdGetDatum(opclass), + 0, 0, 0); + if (!HeapTupleIsValid(ht_opc)) + elog(ERROR, "cache lookup failed for opclass %u", opclass); + + opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc); + + if (!OidIsValid(actual_datatype) || + GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass) + { + char *nsp_name = get_namespace_name(opc_rec->opcnamespace); + char *opc_name = NameStr(opc_rec->opcname); + + name_list = list_make2(makeString(nsp_name), makeString(opc_name)); + } + + ReleaseSysCache(ht_opc); + return name_list; + } + /* ---------- OPERATOR CACHE ---------- */ /* Index: src/include/commands/defrem.h =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/include/commands/defrem.h,v retrieving revision 1.81 diff -p -c -r1.81 defrem.h *** src/include/commands/defrem.h 13 Mar 2007 00:33:43 -0000 1.81 --- src/include/commands/defrem.h 8 Jul 2007 00:49:30 -0000 *************** extern void DefineIndex(RangeVar *heapRe *** 26,31 **** --- 26,32 ---- List *attributeList, Expr *predicate, List *options, + char *inhreloptions, bool unique, bool primary, bool isconstraint, Index: src/include/nodes/parsenodes.h =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/include/nodes/parsenodes.h,v retrieving revision 1.349 diff -p -c -r1.349 parsenodes.h *** src/include/nodes/parsenodes.h 23 Jun 2007 22:12:52 -0000 1.349 --- src/include/nodes/parsenodes.h 8 Jul 2007 00:49:30 -0000 *************** typedef struct IndexStmt *** 1501,1506 **** --- 1501,1507 ---- char *tableSpace; /* tablespace, or NULL to use parent's */ List *indexParams; /* a list of IndexElem */ List *options; /* options from WITH clause */ + char *inhreloptions; /* relopts inherited from parent index */ Node *whereClause; /* qualification (partial-index predicate) */ bool unique; /* is index unique? */ bool primary; /* is index on primary key? */ Index: src/include/utils/builtins.h =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/include/utils/builtins.h,v retrieving revision 1.297 diff -p -c -r1.297 builtins.h *** src/include/utils/builtins.h 26 Jun 2007 16:48:09 -0000 1.297 --- src/include/utils/builtins.h 10 Jul 2007 05:46:07 -0000 *************** extern List *deparse_context_for_plan(No *** 560,565 **** --- 560,567 ---- extern const char *quote_identifier(const char *ident); extern char *quote_qualified_identifier(const char *namespace, const char *ident); + extern char *flatten_reloptions(Oid relid); + extern Datum unflatten_reloptions(char *reloptstring); /* tid.c */ extern Datum tidin(PG_FUNCTION_ARGS); Index: src/include/utils/lsyscache.h =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/include/utils/lsyscache.h,v retrieving revision 1.119 diff -p -c -r1.119 lsyscache.h *** src/include/utils/lsyscache.h 6 Jun 2007 23:00:47 -0000 1.119 --- src/include/utils/lsyscache.h 10 Jul 2007 05:14:32 -0000 *************** extern void get_atttypetypmod(Oid relid, *** 61,66 **** --- 61,67 ---- extern char *get_constraint_name(Oid conoid); extern Oid get_opclass_family(Oid opclass); extern Oid get_opclass_input_type(Oid opclass); + extern List *get_opclass_name(Oid opclass, Oid actual_datatype); extern RegProcedure get_opcode(Oid opno); extern char *get_opname(Oid opno); extern void op_input_types(Oid opno, Oid *lefttype, Oid *righttype); Index: src/test/regress/expected/inherit.out =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/test/regress/expected/inherit.out,v retrieving revision 1.21 diff -p -c -r1.21 inherit.out *** src/test/regress/expected/inherit.out 3 Jun 2007 22:16:03 -0000 1.21 --- src/test/regress/expected/inherit.out 8 Jul 2007 00:49:30 -0000 *************** SELECT * FROM inhg; /* Two records with *** 633,638 **** --- 633,650 ---- (2 rows) DROP TABLE inhg; + CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */ + DROP TABLE inhg; + /* Multiple primary keys creation should fail */ + CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, PRIMARY KEY(x)); /* fails */ + ERROR: multiple primary keys for table "inhg" are not allowed + CREATE TABLE inhz (xx text DEFAULT 'text', yy int UNIQUE); + NOTICE: CREATE TABLE / UNIQUE will create implicit index "inhz_yy_key" for table "inhz" + /* Ok to create multiple unique indexes */ + CREATE TABLE inhg (x text UNIQUE, LIKE inhz INCLUDING INDEXES); + NOTICE: CREATE TABLE / UNIQUE will create implicit index "inhg_x_key" for table "inhg" + DROP TABLE inhg; + DROP TABLE inhz; -- Test changing the type of inherited columns insert into d values('test','one','two','three'); alter table a alter column aa type integer using bit_length(aa); Index: src/test/regress/sql/inherit.sql =================================================================== RCS file: /home/neilc/postgres/cvs_root/pgsql/src/test/regress/sql/inherit.sql,v retrieving revision 1.10 diff -p -c -r1.10 inherit.sql *** src/test/regress/sql/inherit.sql 27 Jun 2006 03:43:20 -0000 1.10 --- src/test/regress/sql/inherit.sql 8 Jul 2007 00:49:30 -0000 *************** INSERT INTO inhg VALUES ('x', 'foo', 'y *** 156,161 **** --- 156,170 ---- SELECT * FROM inhg; /* Two records with three columns in order x=x, xx=text, y=y */ DROP TABLE inhg; + CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */ + DROP TABLE inhg; + /* Multiple primary keys creation should fail */ + CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, PRIMARY KEY(x)); /* fails */ + CREATE TABLE inhz (xx text DEFAULT 'text', yy int UNIQUE); + /* Ok to create multiple unique indexes */ + CREATE TABLE inhg (x text UNIQUE, LIKE inhz INCLUDING INDEXES); + DROP TABLE inhg; + DROP TABLE inhz; -- Test changing the type of inherited columns insert into d values('test','one','two','three');
---------------------------(end of broadcast)--------------------------- TIP 3: Have you checked our extensive FAQ? http://www.postgresql.org/docs/faq