Tom Lane wrote:
> Alvaro Herrera <alvhe...@2ndquadrant.com> writes:
> > Tom Lane wrote:
> >> Hmm.  The bespoke code for constructing the attno map bothers me;
> >> surely there is existing code that does that?  If not, it'd still
> >> make more sense to factor it out, I think, because there will be
> >> other needs for it in future.
> 
> > There isn't any that I could find -- all the existing callers of
> > map_variable_attnos build their map in other ways (while walking an
> > attribute array at construction time).
> 
> [ pokes around... ]  The code I was thinking of is convert_tuples_by_name
> in access/common/tupconvert.c.  There's a bit of an API mismatch in that
> it wants to wrap the mapping array in a TupleConversionMap struct; but
> maybe we could refactor tupconvert.c to offer a way to get just the map
> array.

Ah, nice gadget.  I think the attached patch should do.

> > I also modified the algorithm to use the relcache instead of walking the
> > child's attribute list for each parent attribute (that was silly).
> 
> Hmm.  That might be better in a big-O sense but I doubt it's faster for
> reasonable numbers of columns.

Hm, I was thinking in unreasonable numbers of columns, keeping in mind
that they can appear in arbitrary order in child tables.  Then again,
that probably seldom occurs in real databases.  I suppose this could
become an issue with table partitioning becoming more common, but I'm
okay with deferring the optimization work.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/access/common/tupconvert.c 
b/src/backend/access/common/tupconvert.c
index 945a6a2..bdd3aa4 100644
--- a/src/backend/access/common/tupconvert.c
+++ b/src/backend/access/common/tupconvert.c
@@ -206,55 +206,12 @@ convert_tuples_by_name(TupleDesc indesc,
 {
        TupleConversionMap *map;
        AttrNumber *attrMap;
-       int                     n;
+       int                     n = outdesc->natts;
        int                     i;
        bool            same;
 
        /* Verify compatibility and prepare attribute-number map */
-       n = outdesc->natts;
-       attrMap = (AttrNumber *) palloc0(n * sizeof(AttrNumber));
-       for (i = 0; i < n; i++)
-       {
-               Form_pg_attribute att = outdesc->attrs[i];
-               char       *attname;
-               Oid                     atttypid;
-               int32           atttypmod;
-               int                     j;
-
-               if (att->attisdropped)
-                       continue;                       /* attrMap[i] is 
already 0 */
-               attname = NameStr(att->attname);
-               atttypid = att->atttypid;
-               atttypmod = att->atttypmod;
-               for (j = 0; j < indesc->natts; j++)
-               {
-                       att = indesc->attrs[j];
-                       if (att->attisdropped)
-                               continue;
-                       if (strcmp(attname, NameStr(att->attname)) == 0)
-                       {
-                               /* Found it, check type */
-                               if (atttypid != att->atttypid || atttypmod != 
att->atttypmod)
-                                       ereport(ERROR,
-                                                       
(errcode(ERRCODE_DATATYPE_MISMATCH),
-                                                        errmsg_internal("%s", 
_(msg)),
-                                                        errdetail("Attribute 
\"%s\" of type %s does not match corresponding attribute of type %s.",
-                                                                          
attname,
-                                                                          
format_type_be(outdesc->tdtypeid),
-                                                                          
format_type_be(indesc->tdtypeid))));
-                               attrMap[i] = (AttrNumber) (j + 1);
-                               break;
-                       }
-               }
-               if (attrMap[i] == 0)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
-                                        errmsg_internal("%s", _(msg)),
-                                        errdetail("Attribute \"%s\" of type %s 
does not exist in type %s.",
-                                                          attname,
-                                                          
format_type_be(outdesc->tdtypeid),
-                                                          
format_type_be(indesc->tdtypeid))));
-       }
+       attrMap = convert_tuples_by_name_map(indesc, outdesc, msg);
 
        /*
         * Check to see if the map is one-to-one and the tuple types are the 
same.
@@ -313,6 +270,69 @@ convert_tuples_by_name(TupleDesc indesc,
 }
 
 /*
+ * Return a palloc'd bare attribute map for tuple conversion, matching input
+ * and output columns by name.  (Dropped columns are ignored in both input and
+ * output.)  This is normally a subroutine for convert_tuples_by_name, but can
+ * be used standalone.
+ */
+AttrNumber *
+convert_tuples_by_name_map(TupleDesc indesc,
+                                                  TupleDesc outdesc,
+                                                  const char *msg)
+{
+       AttrNumber *attrMap;
+       int                     n;
+       int                     i;
+
+       n = outdesc->natts;
+       attrMap = (AttrNumber *) palloc0(n * sizeof(AttrNumber));
+       for (i = 0; i < n; i++)
+       {
+               Form_pg_attribute att = outdesc->attrs[i];
+               char       *attname;
+               Oid                     atttypid;
+               int32           atttypmod;
+               int                     j;
+
+               if (att->attisdropped)
+                       continue;                       /* attrMap[i] is 
already 0 */
+               attname = NameStr(att->attname);
+               atttypid = att->atttypid;
+               atttypmod = att->atttypmod;
+               for (j = 0; j < indesc->natts; j++)
+               {
+                       att = indesc->attrs[j];
+                       if (att->attisdropped)
+                               continue;
+                       if (strcmp(attname, NameStr(att->attname)) == 0)
+                       {
+                               /* Found it, check type */
+                               if (atttypid != att->atttypid || atttypmod != 
att->atttypmod)
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_DATATYPE_MISMATCH),
+                                                        errmsg_internal("%s", 
_(msg)),
+                                                        errdetail("Attribute 
\"%s\" of type %s does not match corresponding attribute of type %s.",
+                                                                          
attname,
+                                                                          
format_type_be(outdesc->tdtypeid),
+                                                                          
format_type_be(indesc->tdtypeid))));
+                               attrMap[i] = (AttrNumber) (j + 1);
+                               break;
+                       }
+               }
+               if (attrMap[i] == 0)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg_internal("%s", _(msg)),
+                                        errdetail("Attribute \"%s\" of type %s 
does not exist in type %s.",
+                                                          attname,
+                                                          
format_type_be(outdesc->tdtypeid),
+                                                          
format_type_be(indesc->tdtypeid))));
+       }
+
+       return attrMap;
+}
+
+/*
  * Perform conversion of a tuple according to the map.
  */
 HeapTuple
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index e217f57..85839f8 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -20,6 +20,7 @@
 #include "access/reloptions.h"
 #include "access/relscan.h"
 #include "access/sysattr.h"
+#include "access/tupconvert.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
@@ -7999,12 +8000,69 @@ ATPrepAlterColumnType(List **wqueue,
        ReleaseSysCache(tuple);
 
        /*
-        * The recursion case is handled by ATSimpleRecursion.  However, if we 
are
-        * told not to recurse, there had better not be any child tables; else 
the
-        * alter would put them out of step.
+        * Recurse manually by queueing a new command for each child, if
+        * necessary. We cannot apply ATSimpleRecursion here because we need to
+        * remap attribute numbers in the USING expression, if any.
+        *
+        * If we are told not to recurse, there had better not be any child
+        * tables; else the alter would put them out of step.
         */
        if (recurse)
-               ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
+       {
+               Oid                     relid = RelationGetRelid(rel);
+               ListCell   *child;
+               List       *children;
+
+               children = find_all_inheritors(relid, lockmode, NULL);
+
+               /*
+                * find_all_inheritors does the recursive search of the 
inheritance
+                * hierarchy, so all we have to do is process all of the relids 
in the
+                * list that it returns.
+                */
+               foreach(child, children)
+               {
+                       Oid                     childrelid = lfirst_oid(child);
+                       Relation        childrel;
+
+                       if (childrelid == relid)
+                               continue;
+
+                       /* find_all_inheritors already got lock */
+                       childrel = relation_open(childrelid, NoLock);
+                       CheckTableNotInUse(childrel, "ALTER TABLE");
+
+                       /*
+                        * Remap the attribute numbers.  If no USING expression 
was
+                        * specified, there is no need for this step.
+                        */
+                       if (def->cooked_default)
+                       {
+                               AttrNumber *attmap;
+                               bool            found_whole_row;
+
+                               /* create a copy to scribble on */
+                               cmd = copyObject(cmd);
+
+                               attmap = 
convert_tuples_by_name_map(RelationGetDescr(childrel),
+                                                                               
                        RelationGetDescr(rel),
+                                                                
gettext_noop("could not convert row type"));
+                               ((ColumnDef *) cmd->def)->cooked_default =
+                                       map_variable_attnos(def->cooked_default,
+                                                                               
1, 0,
+                                                                               
attmap, RelationGetDescr(rel)->natts,
+                                                                               
&found_whole_row);
+                               if (found_whole_row)
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                 errmsg("cannot convert 
whole-row table reference"),
+                                                        errdetail("USING 
expression contains a whole-row table reference.")));
+                               pfree(attmap);
+                       }
+                       ATPrepCmd(wqueue, childrel, cmd, false, true, lockmode);
+                       relation_close(childrel, NoLock);
+               }
+       }
        else if (!recursing &&
                         find_inheritance_children(RelationGetRelid(rel), 
NoLock) != NIL)
                ereport(ERROR,
diff --git a/src/include/access/tupconvert.h b/src/include/access/tupconvert.h
index 31559e2..f36c3fe 100644
--- a/src/include/access/tupconvert.h
+++ b/src/include/access/tupconvert.h
@@ -38,6 +38,10 @@ extern TupleConversionMap *convert_tuples_by_name(TupleDesc 
indesc,
                                           TupleDesc outdesc,
                                           const char *msg);
 
+extern AttrNumber *convert_tuples_by_name_map(TupleDesc indesc,
+                                                  TupleDesc outdesc,
+                                                  const char *msg);
+
 extern HeapTuple do_convert_tuple(HeapTuple tuple, TupleConversionMap *map);
 
 extern void free_conversion_map(TupleConversionMap *map);
diff --git a/src/test/regress/expected/alter_table.out 
b/src/test/regress/expected/alter_table.out
index de3c69a..1125517 100644
--- a/src/test/regress/expected/alter_table.out
+++ b/src/test/regress/expected/alter_table.out
@@ -2018,6 +2018,28 @@ select relname, conname, coninhcount, conislocal, 
connoinherit
  test_inh_check_child | test_inh_check_a_check |           1 | f          | f
 (6 rows)
 
+-- ALTER COLUMN TYPE with different schema in children
+-- Bug at https://postgr.es/m/20170102225618.ga10...@telsasoft.com
+CREATE TABLE test_type_diff (f1 int);
+CREATE TABLE test_type_diff_c (extra smallint) INHERITS (test_type_diff);
+ALTER TABLE test_type_diff ADD COLUMN f2 int;
+INSERT INTO test_type_diff_c VALUES (1, 2, 3);
+ALTER TABLE test_type_diff ALTER COLUMN f2 TYPE bigint USING f2::bigint;
+CREATE TABLE test_type_diff2 (int_two int2, int_four int4, int_eight int8);
+CREATE TABLE test_type_diff2_c1 (int_four int4, int_eight int8, int_two int2);
+CREATE TABLE test_type_diff2_c2 (int_eight int8, int_two int2, int_four int4);
+CREATE TABLE test_type_diff2_c3 (int_two int2, int_four int4, int_eight int8);
+ALTER TABLE test_type_diff2_c1 INHERIT test_type_diff2;
+ALTER TABLE test_type_diff2_c2 INHERIT test_type_diff2;
+ALTER TABLE test_type_diff2_c3 INHERIT test_type_diff2;
+INSERT INTO test_type_diff2_c1 VALUES (1, 2, 3);
+INSERT INTO test_type_diff2_c2 VALUES (4, 5, 6);
+INSERT INTO test_type_diff2_c3 VALUES (7, 8, 9);
+ALTER TABLE test_type_diff2 ALTER COLUMN int_four TYPE int8 USING 
int_four::int8;
+-- whole-row references are disallowed
+ALTER TABLE test_type_diff2 ALTER COLUMN int_four TYPE int4 USING 
(pg_column_size(test_type_diff2));
+ERROR:  cannot convert whole-row table reference
+DETAIL:  USING expression contains a whole-row table reference.
 -- check for rollback of ANALYZE corrupting table property flags (bug #11638)
 CREATE TABLE check_fk_presence_1 (id int PRIMARY KEY, t text);
 CREATE TABLE check_fk_presence_2 (id int REFERENCES check_fk_presence_1, t 
text);
diff --git a/src/test/regress/sql/alter_table.sql 
b/src/test/regress/sql/alter_table.sql
index 1459311..b593163 100644
--- a/src/test/regress/sql/alter_table.sql
+++ b/src/test/regress/sql/alter_table.sql
@@ -1321,6 +1321,28 @@ select relname, conname, coninhcount, conislocal, 
connoinherit
   where relname like 'test_inh_check%' and c.conrelid = r.oid
   order by 1, 2;
 
+-- ALTER COLUMN TYPE with different schema in children
+-- Bug at https://postgr.es/m/20170102225618.ga10...@telsasoft.com
+CREATE TABLE test_type_diff (f1 int);
+CREATE TABLE test_type_diff_c (extra smallint) INHERITS (test_type_diff);
+ALTER TABLE test_type_diff ADD COLUMN f2 int;
+INSERT INTO test_type_diff_c VALUES (1, 2, 3);
+ALTER TABLE test_type_diff ALTER COLUMN f2 TYPE bigint USING f2::bigint;
+
+CREATE TABLE test_type_diff2 (int_two int2, int_four int4, int_eight int8);
+CREATE TABLE test_type_diff2_c1 (int_four int4, int_eight int8, int_two int2);
+CREATE TABLE test_type_diff2_c2 (int_eight int8, int_two int2, int_four int4);
+CREATE TABLE test_type_diff2_c3 (int_two int2, int_four int4, int_eight int8);
+ALTER TABLE test_type_diff2_c1 INHERIT test_type_diff2;
+ALTER TABLE test_type_diff2_c2 INHERIT test_type_diff2;
+ALTER TABLE test_type_diff2_c3 INHERIT test_type_diff2;
+INSERT INTO test_type_diff2_c1 VALUES (1, 2, 3);
+INSERT INTO test_type_diff2_c2 VALUES (4, 5, 6);
+INSERT INTO test_type_diff2_c3 VALUES (7, 8, 9);
+ALTER TABLE test_type_diff2 ALTER COLUMN int_four TYPE int8 USING 
int_four::int8;
+-- whole-row references are disallowed
+ALTER TABLE test_type_diff2 ALTER COLUMN int_four TYPE int4 USING 
(pg_column_size(test_type_diff2));
+
 -- check for rollback of ANALYZE corrupting table property flags (bug #11638)
 CREATE TABLE check_fk_presence_1 (id int PRIMARY KEY, t text);
 CREATE TABLE check_fk_presence_2 (id int REFERENCES check_fk_presence_1, t 
text);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to