[ 
https://issues.apache.org/jira/browse/SPARK-19471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reassigned SPARK-19471:
-------------------------------

    Assignee: Feng Zhu

> [SQL]A confusing NullPointerException when creating table
> ---------------------------------------------------------
>
>                 Key: SPARK-19471
>                 URL: https://issues.apache.org/jira/browse/SPARK-19471
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: StanZhai
>            Assignee: Feng Zhu
>            Priority: Critical
>             Fix For: 2.3.0
>
>
> After upgrading our Spark from 1.6.2 to 2.1.0, I encounter a confusing 
> NullPointerException when creating table under Spark 2.1.0, but the problem 
> does not exists in Spark 1.6.1. 
> Environment: Hive 1.2.1, Hadoop 2.6.4 
> {noformat}
> ==================== Code ==================== 
> // spark is an instance of HiveContext 
> // merge is a Hive UDF 
> val df = spark.sql("SELECT merge(field_a, null) AS new_a, field_b AS new_b 
> FROM tb_1 group by field_a, field_b") 
> df.createTempView("tb_temp") 
> spark.sql("create table tb_result stored as parquet as " + 
>   "SELECT new_a" + 
>   "FROM tb_temp" + 
>   "LEFT JOIN `tb_2` ON " + 
>   "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), 
> concat('GrLSRwZE_', cast((rand() * 200) AS int)), (`tb_temp`.`new_b`)) = 
> `tb_2`.`fka6862f17`") 
> ==================== Physical Plan ==================== 
> *Project [new_a] 
> +- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_, 
> cast(cast((_nondeterministic * 200.0) as int) as string)) else new_b], 
> [fka6862f17], LeftOuter, BuildRight 
>    :- HashAggregate(keys=[field_a, field_b], functions=[], output=[new_a, 
> new_b, _nondeterministic]) 
>    :  +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180), 
> coordinator[target post-shuffle partition size: 1024880] 
>    :     +- *HashAggregate(keys=[field_a, field_b], functions=[], 
> output=[field_a, field_b]) 
>    :        +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true, 
> Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1, 
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct 
>    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
> true])) 
>       +- *Project [fka6862f17] 
>          +- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format: 
> Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_2, 
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct 
> What does '*' mean before HashAggregate? 
> ==================== Exception ==================== 
> org.apache.spark.SparkException: Task failed while writing rows 
> ... 
> java.lang.NullPointerException 
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown
>  Source) 
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source) 
>         at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:260)
>  
>         at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:259)
>  
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:392)
>  
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79)
>  
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source) 
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>  
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:252)
>  
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:199)
>  
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:197)
>  
>         at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
>  
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:202)
>  
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:138)
>  
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:137)
>  
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
>         at org.apache.spark.scheduler.Task.run(Task.scala:99) 
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  
>         at java.lang.Thread.run(Thread.java:745) 
> {noformat}
> I also found that when I changed my code as follow: 
> {noformat}
> spark.sql("create table tb_result stored as parquet as " + 
>   "SELECT new_b" + 
>   "FROM tb_temp" + 
>   "LEFT JOIN `tb_2` ON " + 
>   "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), 
> concat('GrLSRwZE_', cast((rand() * 200) AS int)), (`tb_temp`.`new_b`)) = 
> `tb_2`.`fka6862f17`") 
> or 
> spark.sql("create table tb_result stored as parquet as " + 
>   "SELECT new_a" + 
>   "FROM tb_temp" + 
>   "LEFT JOIN `tb_2` ON " + 
>   "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), 
> concat('GrLSRwZE_', cast((200) AS int)), (`tb_temp`.`new_b`)) = 
> `tb_2`.`fka6862f17`") 
> will not have this problem. 
> == Physical Plan of select new_b ... == 
> *Project [new_b] 
> +- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_, 
> cast(cast((_nondeterministic * 200.0) as int) as string)) else new_b], 
> [fka6862f17], LeftOuter, BuildRight 
>    :- *HashAggregate(keys=[field_a, field_b], functions=[], output=[new_b, 
> _nondeterministic]) 
>    :  +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180), 
> coordinator[target post-shuffle partition size: 1024880] 
>    :     +- *HashAggregate(keys=[field_a, field_b], functions=[], 
> output=[field_a, field_b]) 
>    :        +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true, 
> Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1, 
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct 
>    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
> true])) 
>       +- *Project [fka6862f17] 
>          +- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format: 
> Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_2, 
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct 
> {noformat}
> Difference is `HashAggregate(keys=[field_a, field_b], functions=[], 
> output=[new_b, _nondeterministic])` has a '*' char before it. 
> It looks like something wrong with WholeStageCodegen when combine HiveUDF + 
> rand() + group by + join. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to