[ https://issues.apache.org/jira/browse/SPARK-19471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-19471: ------------------------------------ Assignee: Apache Spark > [SQL]A confusing NullPointerException when creating table > --------------------------------------------------------- > > Key: SPARK-19471 > URL: https://issues.apache.org/jira/browse/SPARK-19471 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0 > Reporter: StanZhai > Assignee: Apache Spark > Priority: Critical > > After upgrading our Spark from 1.6.2 to 2.1.0, I encounter a confusing > NullPointerException when creating table under Spark 2.1.0, but the problem > does not exists in Spark 1.6.1. > Environment: Hive 1.2.1, Hadoop 2.6.4 > ==================== Code ==================== > // spark is an instance of HiveContext > // merge is a Hive UDF > val df = spark.sql("SELECT merge(field_a, null) AS new_a, field_b AS new_b > FROM tb_1 group by field_a, field_b") > df.createTempView("tb_temp") > spark.sql("create table tb_result stored as parquet as " + > "SELECT new_a" + > "FROM tb_temp" + > "LEFT JOIN `tb_2` ON " + > "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), > concat('GrLSRwZE_', cast((rand() * 200) AS int)), (`tb_temp`.`new_b`)) = > `tb_2`.`fka6862f17`") > ==================== Physical Plan ==================== > *Project [new_a] > +- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_, > cast(cast((_nondeterministic * 200.0) as int) as string)) else new_b], > [fka6862f17], LeftOuter, BuildRight > :- HashAggregate(keys=[field_a, field_b], functions=[], output=[new_a, > new_b, _nondeterministic]) > : +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180), > coordinator[target post-shuffle partition size: 1024880] > : +- *HashAggregate(keys=[field_a, field_b], functions=[], > output=[field_a, field_b]) > : +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true, > Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1, > PartitionFilters: [], PushedFilters: [], ReadSchema: struct > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) > +- *Project [fka6862f17] > +- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format: > Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_2, > PartitionFilters: [], PushedFilters: [], ReadSchema: struct > What does '*' mean before HashAggregate? > ==================== Exception ==================== > org.apache.spark.SparkException: Task failed while writing rows > ... > java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:260) > > at > org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$3.apply(AggregationIterator.scala:259) > > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:392) > > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:79) > > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:252) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:199) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:197) > > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:202) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:138) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$4.apply(FileFormatWriter.scala:137) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > I also found that when I changed my code as follow: > spark.sql("create table tb_result stored as parquet as " + > "SELECT new_b" + > "FROM tb_temp" + > "LEFT JOIN `tb_2` ON " + > "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), > concat('GrLSRwZE_', cast((rand() * 200) AS int)), (`tb_temp`.`new_b`)) = > `tb_2`.`fka6862f17`") > or > spark.sql("create table tb_result stored as parquet as " + > "SELECT new_a" + > "FROM tb_temp" + > "LEFT JOIN `tb_2` ON " + > "if(((`tb_temp`.`new_b`) = '' OR (`tb_temp`.`new_b`) IS NULL), > concat('GrLSRwZE_', cast((200) AS int)), (`tb_temp`.`new_b`)) = > `tb_2`.`fka6862f17`") > will not have this problem. > == Physical Plan of select new_b ... == > *Project [new_b] > +- *BroadcastHashJoin [if (((new_b = ) || isnull(new_b))) concat(GrLSRwZE_, > cast(cast((_nondeterministic * 200.0) as int) as string)) else new_b], > [fka6862f17], LeftOuter, BuildRight > :- *HashAggregate(keys=[field_a, field_b], functions=[], output=[new_b, > _nondeterministic]) > : +- Exchange(coordinator ) hashpartitioning(field_a, field_b, 180), > coordinator[target post-shuffle partition size: 1024880] > : +- *HashAggregate(keys=[field_a, field_b], functions=[], > output=[field_a, field_b]) > : +- *FileScan parquet bdp.tb_1[field_a,field_b] Batched: true, > Format: Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_1, > PartitionFilters: [], PushedFilters: [], ReadSchema: struct > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) > +- *Project [fka6862f17] > +- *FileScan parquet bdp.tb_2[fka6862f17] Batched: true, Format: > Parquet, Location: InMemoryFileIndex[hdfs://hdcluster/data/tb_2, > PartitionFilters: [], PushedFilters: [], ReadSchema: struct > Difference is `HashAggregate(keys=[field_a, field_b], functions=[], > output=[new_b, _nondeterministic])` has a '*' char before it. > It looks like something wrong with WholeStageCodegen when combine HiveUDF + > rand() + group by + join. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org