[jira] [Comment Edited] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate

2015-11-06 Thread patcharee (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14993771#comment-14993771
 ] 

patcharee edited comment on SPARK-11087 at 11/6/15 2:56 PM:


Hi, I found a scenario where the predicate does not work again. [~zzhan] Can 
you please have a look? 

First create a hive table >>
hive> create table people(name string, address string, phone string) 
partitioned by(age int) stored as orc;

Then use spark shell local mode to insert data and then query >>
120  import org.apache.spark.sql.Row
121  import org.apache.spark.{SparkConf, SparkContext}
122  import org.apache.spark.sql.types._
123  import 
org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType,FloatType}
124  sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
125  val records = (1 to 10).map( i => Row(s"name_$i", s"address_$i", 
s"phone_$i", i ))
126  val schemaString = "name address phone age"
127  val schema = StructType(schemaString.split(" ").map(fieldName => if 
(fieldName.equals("age")) StructField(fieldName, IntegerType, true) else 
StructField(fieldName, StringType, true)))
128  val x = sc.parallelize(records)
129  val rDF = sqlContext.createDataFrame(x, schema)
130  
rDF.write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("age").saveAsTable("people")
131  sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
132  val people = 
sqlContext.read.format("orc").load("/user/hive/warehouse/people")
133  people.registerTempTable("people")
134  sqlContext.sql("SELECT * FROM people WHERE age = 3 and name = 
'name_3'").count

Below is a part of the log message from the last command >>
15/11/06 15:40:36 INFO HadoopRDD: Input split: 
hdfs://localhost:9000/user/hive/warehouse/people/age=3/part-0:0+453
15/11/06 15:40:36 DEBUG OrcInputFormat: No ORC pushdown predicate
15/11/06 15:40:36 DEBUG OrcInputFormat: No ORC pushdown predicate
15/11/06 15:40:36 INFO OrcRawRecordMerger: min key = null, max key = null
15/11/06 15:40:36 INFO OrcRawRecordMerger: min key = null, max key = null
15/11/06 15:40:36 INFO ReaderImpl: Reading ORC rows from 
hdfs://localhost:9000/user/hive/warehouse/people/age=3/part-0 with 
{include: [true, true, false, false], offset: 0, length: 9223372036854775807}
15/11/06 15:40:36 INFO ReaderImpl: Reading ORC rows from 
hdfs://localhost:9000/user/hive/warehouse/people/age=3/part-0 with 
{include: [true, true, false, false], offset: 0, length: 9223372036854775807}
15/11/06 15:40:36 INFO RecordReaderFactory: Schema is not specified on read. 
Using file schema.
15/11/06 15:40:36 INFO RecordReaderFactory: Schema is not specified on read. 
Using file schema.
15/11/06 15:40:36 DEBUG RecordReaderImpl: chunks = [range start: 111 end: 126]
15/11/06 15:40:36 DEBUG RecordReaderImpl: chunks = [range start: 111 end: 126]
15/11/06 15:40:36 DEBUG RecordReaderImpl: merge = [data range [111, 126), size: 
15 type: array-backed]
15/11/06 15:40:36 DEBUG RecordReaderImpl: merge = [data range [111, 126), size: 
15 type: array-backed]
15/11/06 15:40:36 INFO GeneratePredicate: Code generated in 5.063287 ms




was (Author: patcharee):
Hi, I found a scenario where the predicate does not work again. [~zzhan] Can 
you please have a look? 

First create a hive table >>
hive> create table people(name string, address string, phone string) 
partitioned by(age int) stored as orc;

Then use spark shell local mode to insert data and then query >>
120  import org.apache.spark.sql.Row
121  import org.apache.spark.{SparkConf, SparkContext}
122  import org.apache.spark.sql.types._
123  import 
org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType,FloatType}
124  sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
125  val records = (1 to 10).map( i => Row(s"name_$i", s"address_$i", 
s"phone_$i", i ))
126  val schemaString = "name address phone age"
127  val schema = StructType(schemaString.split(" ").map(fieldName => if 
(fieldName.equals("age")) StructField(fieldName, IntegerType, true) else 
StructField(fieldName, StringType, true)))
128  val x = sc.parallelize(records)
129  val rDF = sqlContext.createDataFrame(x, schema)
130  
rDF.write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("age").saveAsTable("people")
131  sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
132  val people = 
sqlContext.read.format("orc").load("/user/hive/warehouse/people")
133  people.registerTempTable("people")
134  sqlContext.sql("SELECT * FROM people WHERE age = 3 and name = 
'name_3'").count

15/11/06 15:40:36 INFO HadoopRDD: Input split: 
hdfs://localhost:9000/user/hive/warehouse/people/age=3/part-0:0+453
15/11/06 15:40:36 DEBUG OrcInputFormat: No ORC pushdown predicate
15/11/06 15:40:36 DEBUG OrcInputFormat: No ORC pushdown predicate
15/11/06 

[jira] [Comment Edited] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate

2015-10-18 Thread patcharee (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960296#comment-14960296
 ] 

patcharee edited comment on SPARK-11087 at 10/19/15 3:34 AM:
-

[~zzhan]

Below is my test. Please check. I tried to change 
"hive.exec.orc.split.strategy" also, but none of them given " OrcInputFormat 
[INFO] ORC pushdown predicate" as same as your result

2508  case class Contact(name: String, phone: String)
2509  case class Person(name: String, age: Int, contacts: Seq[Contact])
2510  val records = (1 to 100).map { i => Person(s"name_$i", i, (0 to 1).map { 
m => Contact(s"contact_$m", s"phone_$m") } )
2511  }
2512  sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
2513  
sc.parallelize(records).toDF().write.format("orc").partitionBy("age").save("peoplePartitioned")
2514  val peoplePartitioned = 
sqlContext.read.format("orc").load("peoplePartitioned")
2515   peoplePartitioned.registerTempTable("peoplePartitioned")

scala> sqlContext.setConf("hive.exec.orc.split.strategy", "ETL")
15/10/16 09:10:49 DEBUG VariableSubstitution: Substitution is on: ETL
15/10/16 09:10:49 DEBUG VariableSubstitution: Substitution is on: ETL
15/10/16 09:10:49 DEBUG VariableSubstitution: Substitution is on: ETL
15/10/16 09:10:49 DEBUG VariableSubstitution: Substitution is on: ETL

scala>  sqlContext.sql("SELECT * FROM peoplePartitioned WHERE age = 20 and name 
= 'name_20'").count
15/10/16 09:10:52 DEBUG VariableSubstitution: Substitution is on: SELECT * FROM 
peoplePartitioned WHERE age = 20 and name = 'name_20'
15/10/16 09:10:52 DEBUG VariableSubstitution: Substitution is on: SELECT * FROM 
peoplePartitioned WHERE age = 20 and name = 'name_20'
15/10/16 09:10:53 INFO PerfLogger: 
15/10/16 09:10:53 INFO PerfLogger: 
15/10/16 09:10:53 DEBUG OrcInputFormat: Number of buckets specified by conf 
file is 0
15/10/16 09:10:53 DEBUG OrcInputFormat: Number of buckets specified by conf 
file is 0
15/10/16 09:10:53 DEBUG AcidUtils: in directory 
hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc
 base = null deltas = 0
15/10/16 09:10:53 DEBUG AcidUtils: in directory 
hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc
 base = null deltas = 0
15/10/16 09:10:53 DEBUG OrcInputFormat: BISplitStrategy strategy for 
hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc
15/10/16 09:10:53 DEBUG OrcInputFormat: BISplitStrategy strategy for 
hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc
15/10/16 09:10:53 INFO OrcInputFormat: FooterCacheHitRatio: 0/0
15/10/16 09:10:53 INFO OrcInputFormat: FooterCacheHitRatio: 0/0
15/10/16 09:10:53 DEBUG OrcInputFormat: 
hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc:0+551
 projected_columns_uncompressed_size: -1
15/10/16 09:10:53 DEBUG OrcInputFormat: 
hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc:0+551
 projected_columns_uncompressed_size: -1
15/10/16 09:10:53 INFO PerfLogger: 
15/10/16 09:10:53 INFO PerfLogger: 
res5: Long = 1

scala> sqlContext.setConf("hive.exec.orc.split.strategy", "BI")
15/10/16 09:11:13 DEBUG VariableSubstitution: Substitution is on: BI
15/10/16 09:11:13 DEBUG VariableSubstitution: Substitution is on: BI
15/10/16 09:11:13 DEBUG VariableSubstitution: Substitution is on: BI
15/10/16 09:11:13 DEBUG VariableSubstitution: Substitution is on: BI

scala>  sqlContext.sql("SELECT * FROM peoplePartitioned WHERE age = 20 and name 
= 'name_20'").count
15/10/16 09:11:19 DEBUG VariableSubstitution: Substitution is on: SELECT * FROM 
peoplePartitioned WHERE age = 20 and name = 'name_20'
15/10/16 09:11:19 DEBUG VariableSubstitution: Substitution is on: SELECT * FROM 
peoplePartitioned WHERE age = 20 and name = 'name_20'
15/10/16 09:11:19 INFO PerfLogger: 
15/10/16 09:11:19 INFO PerfLogger: 
15/10/16 09:11:19 DEBUG OrcInputFormat: Number of buckets specified by conf 
file is 0
15/10/16 09:11:19 DEBUG OrcInputFormat: Number of buckets specified by conf 
file is 0
15/10/16 09:11:19 DEBUG AcidUtils: in directory 
hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc
 base = null deltas = 0
15/10/16 09:11:19 DEBUG AcidUtils: in directory 
hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc
 base = null deltas = 0
15/10/16 09:11:19 DEBUG OrcInputFormat: BISplitStrategy strategy for 
hdfs://helmhdfs/user/patcharee/peoplePartitioned/age=20/part-r-00014-fb3d0874-db8b-40e7-9a4f-0e071c46f509.orc
15/10/16 09:11:19 DEBUG OrcInputFormat: BISplitStrategy strategy for 

[jira] [Comment Edited] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate

2015-10-15 Thread Zhan Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959599#comment-14959599
 ] 

Zhan Zhang edited comment on SPARK-11087 at 10/15/15 8:58 PM:
--

[~patcharee] I try to duplicate your table as much as possible, but still 
didn't hit the problem.  Note that the query has to include some valid record 
in the partition. Otherwise, the partition pruning will trim all predicate 
before hitting the orc scan. Please refer to the below for the details.

case class record(date: Int, hh: Int, x: Int, y: Int, height: Float, u: Float, 
w: Float, ph: Float, phb: Float, t: Float, p: Float, pb: Float, tke_pbl: Float, 
el_pbl: Float, qcloud: Float, zone: Int, z: Int, year: Int, month: Int)

val records = (1 to 100).map { i =>
record(i.toInt, i.toInt, i.toInt, i.toInt, i.toFloat, i.toFloat, i.toFloat, 
i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, 
i.toFloat, i.toInt, i.toInt, i.toInt, i.toInt)
}


sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("5D")
sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").partitionBy("zone","z","year","month").save("4D")
val test = sqlContext.read.format("orc").load("4D")
test.registerTempTable("4D")
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z 
from 4D where x = and y = 117 and zone == 2 and year=2 and z >= 2 and z <= 
8").show

2015-10-15 13:37:45 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = 
(EQUALS x 320)
leaf-1 = (EQUALS y 117)
expr = (and leaf-0 leaf-1)
2507   sqlContext.sql("select date, month, year, hh, u*0.9122461, 
u*-0.40964267, z from 5D where x = 321 and y = 118 and zone == 2 and year=2 and 
z >= 2 and z <= 8").show
2015-10-15 13:40:06 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = 
(EQUALS x 321)
leaf-1 = (EQUALS y 118)
expr = (and leaf-0 leaf-1)



was (Author: zzhan):
[~patcharee] I try to duplicate your table as much as possible, but still 
didn't hit the problem. Please refer to the below for the details.

case class record(date: Int, hh: Int, x: Int, y: Int, height: Float, u: Float, 
w: Float, ph: Float, phb: Float, t: Float, p: Float, pb: Float, tke_pbl: Float, 
el_pbl: Float, qcloud: Float, zone: Int, z: Int, year: Int, month: Int)

val records = (1 to 100).map { i =>
record(i.toInt, i.toInt, i.toInt, i.toInt, i.toFloat, i.toFloat, i.toFloat, 
i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, 
i.toFloat, i.toInt, i.toInt, i.toInt, i.toInt)
}


sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("5D")
sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").partitionBy("zone","z","year","month").save("4D")
val test = sqlContext.read.format("orc").load("4D")
2503   test.registerTempTable("4D")
2504   sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
2505  sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, 
z from 4D where x = 320 and y = 117 and zone == 2 and year=2 and z >= 2 and z 
<= 8").show

2015-10-15 13:37:45 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = 
(EQUALS x 320)
leaf-1 = (EQUALS y 117)
expr = (and leaf-0 leaf-1)
2507   sqlContext.sql("select date, month, year, hh, u*0.9122461, 
u*-0.40964267, z from 5D where x = 321 and y = 118 and zone == 2 and year=2 and 
z >= 2 and z <= 8").show
2015-10-15 13:40:06 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = 
(EQUALS x 321)
leaf-1 = (EQUALS y 118)
expr = (and leaf-0 leaf-1)


> spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
> -
>
> Key: SPARK-11087
> URL: https://issues.apache.org/jira/browse/SPARK-11087
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.1
> Environment: orc file version 0.12 with HIVE_8732
> hive version 1.2.1.2.3.0.0-2557
>Reporter: patcharee
>Priority: Minor
>
> I have an external hive table stored as partitioned orc file (see the table 
> schema below). I tried to query from the table with where clause>
> hiveContext.setConf("spark.sql.orc.filterPushdown", "true")
> hiveContext.sql("select u, v from 4D where zone = 2 and x = 320 and y = 
> 117")). 
> But from the log file with debug logging level on, the ORC pushdown predicate 
> was not generated. 
> Unfortunately my table was not sorted when I inserted the data, but I 
> expected the ORC pushdown