[jira] [Comment Edited] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14993771#comment-14993771 ] patcharee edited comment on SPARK-11087 at 11/6/15 2:56 PM: Hi, I found a scenario where the predicate does not work again. [~zzhan] Can you please have a look? First create a hive table >> hive> create table people(name string, address string, phone string) partitioned by(age int) stored as orc; Then use spark shell local mode to insert data and then query >> 120 import org.apache.spark.sql.Row 121 import org.apache.spark.{SparkConf, SparkContext} 122 import org.apache.spark.sql.types._ 123 import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType,FloatType} 124 sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict") 125 val records = (1 to 10).map( i => Row(s"name_$i", s"address_$i", s"phone_$i", i )) 126 val schemaString = "name address phone age" 127 val schema = StructType(schemaString.split(" ").map(fieldName => if (fieldName.equals("age")) StructField(fieldName, IntegerType, true) else StructField(fieldName, StringType, true))) 128 val x = sc.parallelize(records) 129 val rDF = sqlContext.createDataFrame(x, schema) 130 rDF.write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("age").saveAsTable("people") 131 sqlContext.setConf("spark.sql.orc.filterPushdown", "true") 132 val people = sqlContext.read.format("orc").load("/user/hive/warehouse/people") 133 people.registerTempTable("people") 134 sqlContext.sql("SELECT * FROM people WHERE age = 3 and name = 'name_3'").count Below is a part of the log message from the last command >> 15/11/06 15:40:36 INFO HadoopRDD: Input split: hdfs://localhost:9000/user/hive/warehouse/people/age=3/part-0:0+453 15/11/06 15:40:36 DEBUG OrcInputFormat: No ORC pushdown predicate 15/11/06 15:40:36 DEBUG OrcInputFormat: No ORC pushdown predicate 15/11/06 15:40:36 INFO OrcRawRecordMerger: min key = null, max key = null 15/11/06 15:40:36 INFO OrcRawRecordMerger: min key = null, max key = null 15/11/06 15:40:36 INFO ReaderImpl: Reading ORC rows from hdfs://localhost:9000/user/hive/warehouse/people/age=3/part-0 with {include: [true, true, false, false], offset: 0, length: 9223372036854775807} 15/11/06 15:40:36 INFO ReaderImpl: Reading ORC rows from hdfs://localhost:9000/user/hive/warehouse/people/age=3/part-0 with {include: [true, true, false, false], offset: 0, length: 9223372036854775807} 15/11/06 15:40:36 INFO RecordReaderFactory: Schema is not specified on read. Using file schema. 15/11/06 15:40:36 INFO RecordReaderFactory: Schema is not specified on read. Using file schema. 15/11/06 15:40:36 DEBUG RecordReaderImpl: chunks = [range start: 111 end: 126] 15/11/06 15:40:36 DEBUG RecordReaderImpl: chunks = [range start: 111 end: 126] 15/11/06 15:40:36 DEBUG RecordReaderImpl: merge = [data range [111, 126), size: 15 type: array-backed] 15/11/06 15:40:36 DEBUG RecordReaderImpl: merge = [data range [111, 126), size: 15 type: array-backed] 15/11/06 15:40:36 INFO GeneratePredicate: Code generated in 5.063287 ms was (Author: patcharee): Hi, I found a scenario where the predicate does not work again. [~zzhan] Can you please have a look? First create a hive table >> hive> create table people(name string, address string, phone string) partitioned by(age int) stored as orc; Then use spark shell local mode to insert data and then query >> 120 import org.apache.spark.sql.Row 121 import org.apache.spark.{SparkConf, SparkContext} 122 import org.apache.spark.sql.types._ 123 import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType,FloatType} 124 sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict") 125 val records = (1 to 10).map( i => Row(s"name_$i", s"address_$i", s"phone_$i", i )) 126 val schemaString = "name address phone age" 127 val schema = StructType(schemaString.split(" ").map(fieldName => if (fieldName.equals("age")) StructField(fieldName, IntegerType, true) else StructField(fieldName, StringType, true))) 128 val x = sc.parallelize(records) 129 val rDF = sqlContext.createDataFrame(x, schema) 130 rDF.write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("age").saveAsTable("people") 131 sqlContext.setConf("spark.sql.orc.filterPushdown", "true") 132 val people = sqlContext.read.format("orc").load("/user/hive/warehouse/people") 133 people.registerTempTable("people") 134 sqlContext.sql("SELECT * FROM people WHERE age = 3 and name = 'name_3'").count 15/11/06 15:40:36 INFO HadoopRDD: Input split: hdfs://localhost:9000/user/hive/warehouse/people/age=3/part-0:0+453 15/11/06 15:40:36 DEBUG OrcInputFormat: No ORC pushdown predicate 15/11/06 15:40:36 DEBUG OrcInputFormat: No ORC pushdown predicate 15/11/06
[jira] [Comment Edited] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960296#comment-14960296 ] patcharee edited comment on SPARK-11087 at 10/19/15 3:34 AM: - [~zzhan] Below is my test. Please check. I tried to change "hive.exec.orc.split.strategy" also, but none of them given " OrcInputFormat [INFO] ORC pushdown predicate" as same as your result 2508 case class Contact(name: String, phone: String) 2509 case class Person(name: String, age: Int, contacts: Seq[Contact]) 2510 val records = (1 to 100).map { i => Person(s"name_$i", i, (0 to 1).map { m => Contact(s"contact_$m", s"phone_$m") } ) 2511 } 2512 sqlContext.setConf("spark.sql.orc.filterPushdown", "true") 2513 sc.parallelize(records).toDF().write.format("orc").partitionBy("age").save("peoplePartitioned") 2514 val peoplePartitioned = sqlContext.read.format("orc").load("peoplePartitioned") 2515 peoplePartitioned.registerTempTable("peoplePartitioned") scala> sqlContext.setConf("hive.exec.orc.split.strategy", "ETL") 15/10/16 09:10:49 DEBUG VariableSubstitution: Substitution is on: ETL 15/10/16 09:10:49 DEBUG VariableSubstitution: Substitution is on: ETL 15/10/16 09:10:49 DEBUG VariableSubstitution: Substitution is on: ETL 15/10/16 09:10:49 DEBUG VariableSubstitution: Substitution is on: ETL scala> sqlContext.sql("SELECT * FROM peoplePartitioned WHERE age = 20 and name = 'name_20'").count 15/10/16 09:10:52 DEBUG VariableSubstitution: Substitution is on: SELECT * FROM peoplePartitioned WHERE age = 20 and name = 'name_20' 15/10/16 09:10:52 DEBUG VariableSubstitution: Substitution is on: SELECT * FROM peoplePartitioned WHERE age = 20 and name = 'name_20' 15/10/16 09:10:53 INFO PerfLogger: 15/10/16 09:10:53 INFO PerfLogger: 15/10/16 09:10:53 DEBUG OrcInputFormat: Number of buckets specified by conf file is 0 15/10/16 09:10:53 DEBUG OrcInputFormat: Number of buckets specified by conf file is 0 15/10/16 09:10:53 DEBUG AcidUtils: in directory hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc base = null deltas = 0 15/10/16 09:10:53 DEBUG AcidUtils: in directory hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc base = null deltas = 0 15/10/16 09:10:53 DEBUG OrcInputFormat: BISplitStrategy strategy for hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc 15/10/16 09:10:53 DEBUG OrcInputFormat: BISplitStrategy strategy for hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc 15/10/16 09:10:53 INFO OrcInputFormat: FooterCacheHitRatio: 0/0 15/10/16 09:10:53 INFO OrcInputFormat: FooterCacheHitRatio: 0/0 15/10/16 09:10:53 DEBUG OrcInputFormat: hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc:0+551 projected_columns_uncompressed_size: -1 15/10/16 09:10:53 DEBUG OrcInputFormat: hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc:0+551 projected_columns_uncompressed_size: -1 15/10/16 09:10:53 INFO PerfLogger: 15/10/16 09:10:53 INFO PerfLogger: res5: Long = 1 scala> sqlContext.setConf("hive.exec.orc.split.strategy", "BI") 15/10/16 09:11:13 DEBUG VariableSubstitution: Substitution is on: BI 15/10/16 09:11:13 DEBUG VariableSubstitution: Substitution is on: BI 15/10/16 09:11:13 DEBUG VariableSubstitution: Substitution is on: BI 15/10/16 09:11:13 DEBUG VariableSubstitution: Substitution is on: BI scala> sqlContext.sql("SELECT * FROM peoplePartitioned WHERE age = 20 and name = 'name_20'").count 15/10/16 09:11:19 DEBUG VariableSubstitution: Substitution is on: SELECT * FROM peoplePartitioned WHERE age = 20 and name = 'name_20' 15/10/16 09:11:19 DEBUG VariableSubstitution: Substitution is on: SELECT * FROM peoplePartitioned WHERE age = 20 and name = 'name_20' 15/10/16 09:11:19 INFO PerfLogger: 15/10/16 09:11:19 INFO PerfLogger: 15/10/16 09:11:19 DEBUG OrcInputFormat: Number of buckets specified by conf file is 0 15/10/16 09:11:19 DEBUG OrcInputFormat: Number of buckets specified by conf file is 0 15/10/16 09:11:19 DEBUG AcidUtils: in directory hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc base = null deltas = 0 15/10/16 09:11:19 DEBUG AcidUtils: in directory hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc base = null deltas = 0 15/10/16 09:11:19 DEBUG OrcInputFormat: BISplitStrategy strategy for hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc 15/10/16 09:11:19 DEBUG OrcInputFormat: BISplitStrategy strategy for
[jira] [Comment Edited] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959599#comment-14959599 ] Zhan Zhang edited comment on SPARK-11087 at 10/15/15 8:58 PM: -- [~patcharee] I try to duplicate your table as much as possible, but still didn't hit the problem. Note that the query has to include some valid record in the partition. Otherwise, the partition pruning will trim all predicate before hitting the orc scan. Please refer to the below for the details. case class record(date: Int, hh: Int, x: Int, y: Int, height: Float, u: Float, w: Float, ph: Float, phb: Float, t: Float, p: Float, pb: Float, tke_pbl: Float, el_pbl: Float, qcloud: Float, zone: Int, z: Int, year: Int, month: Int) val records = (1 to 100).map { i => record(i.toInt, i.toInt, i.toInt, i.toInt, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toInt, i.toInt, i.toInt, i.toInt) } sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("5D") sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").partitionBy("zone","z","year","month").save("4D") val test = sqlContext.read.format("orc").load("4D") test.registerTempTable("4D") sqlContext.setConf("spark.sql.orc.filterPushdown", "true") sqlContext.setConf("spark.sql.orc.filterPushdown", "true") sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 4D where x = and y = 117 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:37:45 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 320) leaf-1 = (EQUALS y 117) expr = (and leaf-0 leaf-1) 2507 sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 5D where x = 321 and y = 118 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:40:06 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 321) leaf-1 = (EQUALS y 118) expr = (and leaf-0 leaf-1) was (Author: zzhan): [~patcharee] I try to duplicate your table as much as possible, but still didn't hit the problem. Please refer to the below for the details. case class record(date: Int, hh: Int, x: Int, y: Int, height: Float, u: Float, w: Float, ph: Float, phb: Float, t: Float, p: Float, pb: Float, tke_pbl: Float, el_pbl: Float, qcloud: Float, zone: Int, z: Int, year: Int, month: Int) val records = (1 to 100).map { i => record(i.toInt, i.toInt, i.toInt, i.toInt, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toInt, i.toInt, i.toInt, i.toInt) } sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("5D") sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").partitionBy("zone","z","year","month").save("4D") val test = sqlContext.read.format("orc").load("4D") 2503 test.registerTempTable("4D") 2504 sqlContext.setConf("spark.sql.orc.filterPushdown", "true") 2505 sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 4D where x = 320 and y = 117 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:37:45 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 320) leaf-1 = (EQUALS y 117) expr = (and leaf-0 leaf-1) 2507 sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 5D where x = 321 and y = 118 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:40:06 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 321) leaf-1 = (EQUALS y 118) expr = (and leaf-0 leaf-1) > spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate > - > > Key: SPARK-11087 > URL: https://issues.apache.org/jira/browse/SPARK-11087 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: orc file version 0.12 with HIVE_8732 > hive version 1.2.1.2.3.0.0-2557 >Reporter: patcharee >Priority: Minor > > I have an external hive table stored as partitioned orc file (see the table > schema below). I tried to query from the table with where clause> > hiveContext.setConf("spark.sql.orc.filterPushdown", "true") > hiveContext.sql("select u, v from 4D where zone = 2 and x = 320 and y = > 117")). > But from the log file with debug logging level on, the ORC pushdown predicate > was not generated. > Unfortunately my table was not sorted when I inserted the data, but I > expected the ORC pushdown