[ 
https://issues.apache.org/jira/browse/SPARK-15516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349520#comment-15349520
 ] 

MIN-FU YANG commented on SPARK-15516:
-------------------------------------

[~holdenk] I am wondering the banning on schema merge on different type key is 
a designated feature. 
I added following code
{code:title=StructType.scala#456-462}
    case (leftNumeric: NumericType, rightNumeric: NumericType) =>
        (leftNumeric, rightNumeric) match {
          case (leftIntegral: IntegralType, rightIntegral: IntegralType) =>
            Seq(leftIntegral, rightIntegral).maxBy(_.defaultSize)
          case (leftFractional: FractionalType, rightFractional: 
FractionalType) =>
            Seq(leftFractional, rightFractional).maxBy(_.defaultSize)
     }
{code}
to StructType.merge function and the tempView can be created successfully.
Then I query records in the tempView, it throws exceptions
{code}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
3, localhost): org.apache.spark.SparkException: Task failed while writing rows
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:260)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:273)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:252)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:251)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:251)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:257)
{code}

Although modify function getLong in OnHeapColumnVector to 
{code}
  @Override
  public long getLong(int rowId) {
    if (dictionary == null) {
      if(longData != null)
        return longData[rowId];
      else
        return intData[rowId];
    } else {
      return dictionary.decodeToLong(dictionaryIds.getInt(rowId));
    }
  }
{code}
can solve the problem. 

But I am afraid it's not the designated behaviour.

> Schema merging in driver fails for parquet when merging LongType and 
> IntegerType
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-15516
>                 URL: https://issues.apache.org/jira/browse/SPARK-15516
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>         Environment: Databricks
>            Reporter: Hossein Falaki
>
> I tried to create a table from partitioned parquet directories that requires 
> schema merging. I get following error:
> {code}
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$24$$anonfun$apply$9.apply(ParquetRelation.scala:831)
>     at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$24$$anonfun$apply$9.apply(ParquetRelation.scala:826)
>     at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$24.apply(ParquetRelation.scala:826)
>     at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$24.apply(ParquetRelation.scala:801)
>     at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)
>     at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>     at org.apache.spark.scheduler.Task.run(Task.scala:85)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to merge incompatible data 
> types LongType and IntegerType
>     at org.apache.spark.sql.types.StructType$.merge(StructType.scala:462)
>     at 
> org.apache.spark.sql.types.StructType$$anonfun$merge$1$$anonfun$apply$3.apply(StructType.scala:420)
>     at 
> org.apache.spark.sql.types.StructType$$anonfun$merge$1$$anonfun$apply$3.apply(StructType.scala:418)
>     at scala.Option.map(Option.scala:145)
>     at 
> org.apache.spark.sql.types.StructType$$anonfun$merge$1.apply(StructType.scala:418)
>     at 
> org.apache.spark.sql.types.StructType$$anonfun$merge$1.apply(StructType.scala:415)
>     at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>     at org.apache.spark.sql.types.StructType$.merge(StructType.scala:415)
>     at org.apache.spark.sql.types.StructType.merge(StructType.scala:333)
>     at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$24$$anonfun$apply$9.apply(ParquetRelation.scala:829)
> {code}
> cc @rxin and [~mengxr]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to