[ 
https://issues.apache.org/jira/browse/SPARK-16517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lichenglin updated SPARK-16517:
-------------------------------
    Description: 
{code}
setName("abc");
HiveContext hive = getHiveContext();
DataFrame d = hive.createDataFrame(
                                getJavaSparkContext().parallelize(
                                                
Arrays.asList(RowFactory.create("abc", "abc", 5.0), RowFactory.create("abcd", 
"abcd", 5.0))),
                                DataTypes.createStructType(
                                                
Arrays.asList(DataTypes.createStructField("card_id", DataTypes.StringType, 
true),
                                                                
DataTypes.createStructField("tag_name", DataTypes.StringType, true),
                                                                
DataTypes.createStructField("v", DataTypes.DoubleType, true))));
                
d.write().partitionBy("v").mode(SaveMode.Overwrite).saveAsTable("abc");
                hive.sql("alter table abc add columns(v2 double)");
                hive.refreshTable("abc");
                hive.sql("describe abc").show();
                DataFrame d2 = hive.createDataFrame(
                                
getJavaSparkContext().parallelize(Arrays.asList(RowFactory.create("abc", "abc", 
3.0, 4.0),
                                                RowFactory.create("abcd", 
"abcd", 3.0, 1.0))),
                                new StructType(new StructField[] { 
DataTypes.createStructField("card_id", DataTypes.StringType, true),
                                                
DataTypes.createStructField("tag_name", DataTypes.StringType, true),
                                                
DataTypes.createStructField("v", DataTypes.DoubleType, true),
                                                
DataTypes.createStructField("v2", DataTypes.DoubleType, true) }));
                
d2.write().partitionBy("v").mode(SaveMode.Append).saveAsTable("abc");
                hive.table("abc").show();
{code}
spark.sql.parquet.mergeSchema has been set to  "true".

The code's exception is here 
{code}

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| card_id|   string|       |
|tag_name|   string|       |
|       v|   double|       |
+--------+---------+-------+

2016-07-13 13:40:43,637 INFO 
[org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : 
db=default tbl=abc
2016-07-13 13:40:43,637 INFO 
[org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl      
ip=unknown-ip-addr      cmd=get_table : db=default tbl=abc      
2016-07-13 13:40:43,693 INFO [org.apache.spark.storage.BlockManagerInfo:58] - 
Removed broadcast_2_piece0 on localhost:50647 in memory (size: 1176.0 B, free: 
1125.7 MB)
2016-07-13 13:40:43,700 INFO [org.apache.spark.ContextCleaner:58] - Cleaned 
accumulator 2
2016-07-13 13:40:43,702 INFO [org.apache.spark.storage.BlockManagerInfo:58] - 
Removed broadcast_1_piece0 on localhost:50647 in memory (size: 19.4 KB, free: 
1125.7 MB)
2016-07-13 13:40:43,702 INFO 
[org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : 
db=default tbl=abc
2016-07-13 13:40:43,703 INFO 
[org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl      
ip=unknown-ip-addr      cmd=get_table : db=default tbl=abc      
Exception in thread "main" java.lang.RuntimeException: 
Relation[card_id#26,tag_name#27,v#28] ParquetRelation
 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE 
statement generates the same number of columns as its schema.
        at scala.sys.package$.error(package.scala:27)
        at 
org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:68)
        at 
org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:58)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
        at 
org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:58)
        at 
org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:57)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
        at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
        at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
        at 
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:39)
        at 
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:38)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:43)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:43)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at 
org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189)
        at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239)
        at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
        at CubeDemoTest.main(CubeDemoTest.java:52)
{code}

the metadata is store in mysql,here is the columns_v2's data on table abc
{code}
447     from deserializer       col     array<string>   0
447             v2      double  1
{code}

The sql "alter table abc add columns(v2 double)" has write a new column v2's 
metadata into the mysql
But sparksql can't read correctly.

  was:
{code}
setName("abc");
                HiveContext hive = getHiveContext();
                DataFrame d = hive.createDataFrame(
                                getJavaSparkContext().parallelize(
                                                
Arrays.asList(RowFactory.create("abc", "abc", 5.0), RowFactory.create("abcd", 
"abcd", 5.0))),
                                DataTypes.createStructType(
                                                
Arrays.asList(DataTypes.createStructField("card_id", DataTypes.StringType, 
true),
                                                                
DataTypes.createStructField("tag_name", DataTypes.StringType, true),
                                                                
DataTypes.createStructField("v", DataTypes.DoubleType, true))));
                
d.write().partitionBy("v").mode(SaveMode.Overwrite).saveAsTable("abc");
                hive.sql("alter table abc add columns(v2 double)");
                hive.refreshTable("abc");
                hive.sql("describe abc").show();
                DataFrame d2 = hive.createDataFrame(
                                
getJavaSparkContext().parallelize(Arrays.asList(RowFactory.create("abc", "abc", 
3.0, 4.0),
                                                RowFactory.create("abcd", 
"abcd", 3.0, 1.0))),
                                new StructType(new StructField[] { 
DataTypes.createStructField("card_id", DataTypes.StringType, true),
                                                
DataTypes.createStructField("tag_name", DataTypes.StringType, true),
                                                
DataTypes.createStructField("v", DataTypes.DoubleType, true),
                                                
DataTypes.createStructField("v2", DataTypes.DoubleType, true) }));
                
d2.write().partitionBy("v").mode(SaveMode.Append).saveAsTable("abc");
                hive.table("abc").show();
{code}
spark.sql.parquet.mergeSchema has been set to  "true".

The code's exception is here 
{code}

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| card_id|   string|       |
|tag_name|   string|       |
|       v|   double|       |
+--------+---------+-------+

2016-07-13 13:40:43,637 INFO 
[org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : 
db=default tbl=abc
2016-07-13 13:40:43,637 INFO 
[org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl      
ip=unknown-ip-addr      cmd=get_table : db=default tbl=abc      
2016-07-13 13:40:43,693 INFO [org.apache.spark.storage.BlockManagerInfo:58] - 
Removed broadcast_2_piece0 on localhost:50647 in memory (size: 1176.0 B, free: 
1125.7 MB)
2016-07-13 13:40:43,700 INFO [org.apache.spark.ContextCleaner:58] - Cleaned 
accumulator 2
2016-07-13 13:40:43,702 INFO [org.apache.spark.storage.BlockManagerInfo:58] - 
Removed broadcast_1_piece0 on localhost:50647 in memory (size: 19.4 KB, free: 
1125.7 MB)
2016-07-13 13:40:43,702 INFO 
[org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : 
db=default tbl=abc
2016-07-13 13:40:43,703 INFO 
[org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl      
ip=unknown-ip-addr      cmd=get_table : db=default tbl=abc      
Exception in thread "main" java.lang.RuntimeException: 
Relation[card_id#26,tag_name#27,v#28] ParquetRelation
 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE 
statement generates the same number of columns as its schema.
        at scala.sys.package$.error(package.scala:27)
        at 
org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:68)
        at 
org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:58)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
        at 
org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:58)
        at 
org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:57)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
        at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
        at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
        at 
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:39)
        at 
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:38)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:43)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:43)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at 
org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189)
        at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239)
        at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
        at CubeDemoTest.main(CubeDemoTest.java:52)
{code}

the metadata is store in mysql,here is the columns_v2's data on table abc
{code}
447     from deserializer       col     array<string>   0
447             v2      double  1
{code}

The sql "alter table abc add columns(v2 double)" has write a new column v2's 
metadata into the mysql
But sparksql can't read correctly.


> can't add columns on the table witch column metadata is serializer
> ------------------------------------------------------------------
>
>                 Key: SPARK-16517
>                 URL: https://issues.apache.org/jira/browse/SPARK-16517
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.2
>            Reporter: lichenglin
>
> {code}
> setName("abc");
> HiveContext hive = getHiveContext();
> DataFrame d = hive.createDataFrame(
>                               getJavaSparkContext().parallelize(
>                                               
> Arrays.asList(RowFactory.create("abc", "abc", 5.0), RowFactory.create("abcd", 
> "abcd", 5.0))),
>                               DataTypes.createStructType(
>                                               
> Arrays.asList(DataTypes.createStructField("card_id", DataTypes.StringType, 
> true),
>                                                               
> DataTypes.createStructField("tag_name", DataTypes.StringType, true),
>                                                               
> DataTypes.createStructField("v", DataTypes.DoubleType, true))));
>               
> d.write().partitionBy("v").mode(SaveMode.Overwrite).saveAsTable("abc");
>               hive.sql("alter table abc add columns(v2 double)");
>               hive.refreshTable("abc");
>               hive.sql("describe abc").show();
>               DataFrame d2 = hive.createDataFrame(
>                               
> getJavaSparkContext().parallelize(Arrays.asList(RowFactory.create("abc", 
> "abc", 3.0, 4.0),
>                                               RowFactory.create("abcd", 
> "abcd", 3.0, 1.0))),
>                               new StructType(new StructField[] { 
> DataTypes.createStructField("card_id", DataTypes.StringType, true),
>                                               
> DataTypes.createStructField("tag_name", DataTypes.StringType, true),
>                                               
> DataTypes.createStructField("v", DataTypes.DoubleType, true),
>                                               
> DataTypes.createStructField("v2", DataTypes.DoubleType, true) }));
>               
> d2.write().partitionBy("v").mode(SaveMode.Append).saveAsTable("abc");
>               hive.table("abc").show();
> {code}
> spark.sql.parquet.mergeSchema has been set to  "true".
> The code's exception is here 
> {code}
> +--------+---------+-------+
> |col_name|data_type|comment|
> +--------+---------+-------+
> | card_id|   string|       |
> |tag_name|   string|       |
> |       v|   double|       |
> +--------+---------+-------+
> 2016-07-13 13:40:43,637 INFO 
> [org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : 
> db=default tbl=abc
> 2016-07-13 13:40:43,637 INFO 
> [org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl    
> ip=unknown-ip-addr      cmd=get_table : db=default tbl=abc      
> 2016-07-13 13:40:43,693 INFO [org.apache.spark.storage.BlockManagerInfo:58] - 
> Removed broadcast_2_piece0 on localhost:50647 in memory (size: 1176.0 B, 
> free: 1125.7 MB)
> 2016-07-13 13:40:43,700 INFO [org.apache.spark.ContextCleaner:58] - Cleaned 
> accumulator 2
> 2016-07-13 13:40:43,702 INFO [org.apache.spark.storage.BlockManagerInfo:58] - 
> Removed broadcast_1_piece0 on localhost:50647 in memory (size: 19.4 KB, free: 
> 1125.7 MB)
> 2016-07-13 13:40:43,702 INFO 
> [org.apache.hadoop.hive.metastore.HiveMetaStore:746] - 0: get_table : 
> db=default tbl=abc
> 2016-07-13 13:40:43,703 INFO 
> [org.apache.hadoop.hive.metastore.HiveMetaStore.audit:371] - ugi=licl    
> ip=unknown-ip-addr      cmd=get_table : db=default tbl=abc      
> Exception in thread "main" java.lang.RuntimeException: 
> Relation[card_id#26,tag_name#27,v#28] ParquetRelation
>  requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE 
> statement generates the same number of columns as its schema.
>       at scala.sys.package$.error(package.scala:27)
>       at 
> org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:68)
>       at 
> org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$$anonfun$apply$2.applyOrElse(rules.scala:58)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
>       at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
>       at 
> org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:58)
>       at 
> org.apache.spark.sql.execution.datasources.PreInsertCastAndRename$.apply(rules.scala:57)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
>       at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
>       at scala.collection.immutable.List.foldLeft(List.scala:84)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
>       at 
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
>       at 
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:39)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:38)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:43)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:43)
>       at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47)
>       at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
>       at 
> org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
>       at CubeDemoTest.main(CubeDemoTest.java:52)
> {code}
> the metadata is store in mysql,here is the columns_v2's data on table abc
> {code}
> 447   from deserializer       col     array<string>   0
> 447           v2      double  1
> {code}
> The sql "alter table abc add columns(v2 double)" has write a new column v2's 
> metadata into the mysql
> But sparksql can't read correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to