Wang, Gang created SPARK-23373: ---------------------------------- Summary: Can not execute "count distinct" queries on parquet formatted table Key: SPARK-23373 URL: https://issues.apache.org/jira/browse/SPARK-23373 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.2.0 Reporter: Wang, Gang
I failed to run sql "select count(distinct n_name) from nation", table nation is formatted in Parquet, error trace is as following. _spark-sql> select count(distinct n_name) from nation;_ _18/02/09 03:55:28 INFO main SparkSqlParser:54 Parsing command: select count(distinct n_name) from nation_ _Error in query: Table or view not found: nation; line 1 pos 35_ _spark-sql> select count(distinct n_name) from nation_parquet;_ _18/02/09 03:55:36 INFO main SparkSqlParser:54 Parsing command: select count(distinct n_name) from nation_parquet_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: array<string>_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Pruning directories with:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Data Filters:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Post-Scan Filters:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Output Data Schema: struct<n_name: string>_ _18/02/09 03:55:38 INFO main FileSourceScanExec:54 Pushed Filters:_ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 295.88685 ms_ _18/02/09 03:55:39 INFO main HashAggregateExec:54 spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current version of codegened fast hashmap does not support this aggregate._ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 51.075394 ms_ _18/02/09 03:55:39 INFO main HashAggregateExec:54 spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current version of codegened fast hashmap does not support this aggregate._ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 42.819226 ms_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 parquetFilterPushDown is true_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 start filter class_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 Pushed not defined_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 end filter class_ _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0 stored as values in memory (estimated size 305.0 KB, free 366.0 MB)_ _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0_piece0 stored as bytes in memory (estimated size 27.6 KB, free 366.0 MB)_ _18/02/09 03:55:39 INFO dispatcher-event-loop-7 BlockManagerInfo:54 Added broadcast_0_piece0 in memory on 10.64.205.170:45616 (size: 27.6 KB, free: 366.3 MB)_ _18/02/09 03:55:39 INFO main SparkContext:54 Created broadcast 0 from processCmd at CliDriver.java:376_ _18/02/09 03:55:39 INFO main InMemoryFileIndex:54 Selected files after partition pruning:_ _PartitionDirectory([empty row],ArrayBuffer(LocatedFileStatus\{path=hdfs://btd-dev-2425209.lvs01.dev.ebayc3.com:8020/apps/hive/warehouse/nation_parquet/000000_0; isDirectory=false; length=3216; replication=3; blocksize=134217728; modification_time=1516619879024; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}))_ _18/02/09 03:55:39 INFO main FileSourceScanExec:54 Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes._ _18/02/09 03:55:39 ERROR main SparkSQLDriver:91 Failed in [select count(distinct n_name) from nation_parquet]_ {color:#FF0000}*_org.apache.spark.SparkException: Task not serializable_*{color} _at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)_ _at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)_ _at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)_ _at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)_ _at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)_ _at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)_ _at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ _at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)_ _at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)_ _at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88)_ _at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124)_ _at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115)_ _at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)_ _at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ _at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)_ _at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)_ _at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:252)_ _at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)_ _at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)_ _at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ _at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)_ _at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)_ _at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:88)_ _at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:124)_ _at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:115)_ _at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)_ _at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ _at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)_ _at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)_ _at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:252)_ _at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)_ _at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ _at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)_ _at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)_ _at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)_ _at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)_ _at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:298)_ _at org.apache.spark.sql.execution.QueryExecution.hiveResultString(QueryExecution.scala:133)_ _at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)_ _at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:340)_ _at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)_ _at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:248)_ _at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)_ _at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_ _at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)_ _at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_ _at java.lang.reflect.Method.invoke(Method.java:498)_ _at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)_ _at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)_ _at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)_ _at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)_ _at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)_ _Caused by: java.io.NotSerializableException: scala.concurrent.impl.ExecutionContextImpl$$anon$1_ _Serialization stack:_ _- object not serializable (class: scala.concurrent.impl.ExecutionContextImpl$$anon$1, value: scala.concurrent.impl.ExecutionContextImpl$$anon$1@149e457)_ _- field (class: org.apache.spark.sql.execution.FileSourceScanExec, name: org$apache$spark$sql$execution$FileSourceScanExec$$executionContext, type: interface scala.concurrent.ExecutionContextExecutorService)_ _- object (class org.apache.spark.sql.execution.FileSourceScanExec, FileScan parquet default.nation_parquet[n_name#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://btd-dev-2425209.lvs01.dev.ebayc3.com:8020/apps/hive/warehouse/nation_par..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<n_name:string>, UsedIndexes: []_ _)_ _- field (class: org.apache.spark.sql.execution.aggregate.HashAggregateExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)_ _- object (class org.apache.spark.sql.execution.aggregate.HashAggregateExec, HashAggregate(keys=[n_name#1], functions=[], output=[n_name#1])_ _+- FileScan parquet default.nation_parquet[n_name#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://btd-dev-2425209.lvs01.dev.ebayc3.com:8020/apps/hive/warehouse/nation_par..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<n_name:string>, UsedIndexes: []_ _)_ _- element of array (index: 0)_ _- array (class [Ljava.lang.Object;, size 7)_ _- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)_ _- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)_ _at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)_ _at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)_ _at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)_ _at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)_ _... 75 more_ _org.apache.spark.SparkException: Task not serializable_ _at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)_ _at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)_ _at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)_ _at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)_ _at org.apache.spark.rdd.RDDOperationScope$.withS..._ And I tried the same query on table formatted in TXT, it worked good. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org