Parquet Filter pushdown not working and statistics are not generating for any column with Spark 1.6 CDH 5.7

2017-11-21 Thread Rabin Banerjee
Hi All ,


 I am using CDH 5.7 which comes with Spark version 1.6.0.  I am saving my
data set as parquet data and then querying it . Query is executing fine But
when I checked the files generated by spark, I found statistics(min/max) is
missing for all the columns . And hence filters are not pushed down. Its
scanning the entire file.


*(1 to 3).map(i => (i, i.toString)).toDF("a",
"b").sort("a").write.parquet("/hdfs/path/to/store")*


*parquet-tools meta
part-r-00186-03addad8-c19d-4812-b83b-a8708606183b.gz.parquet*

creator: p*arquet-mr version 1.5.0-cdh5.7.1* (build ${buildNumber})

extra:   org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}},{"name":"b","type":"string","nullable":true,"metadata":{}}]}



file schema: spark_schema

--

a:   OPTIONAL INT32 R:0 D:1

b:   OPTIONAL BINARY O:UTF8 R:0 D:1


row group 1: RC:148 TS:2012

--

a:INT32 GZIP DO:0 FPO:4 SZ:297/635/2.14 VC:148
ENC:BIT_PACKED,PLAIN,RLE

b:BINARY GZIP DO:0 FPO:301 SZ:301/1377/4.57 VC:148
ENC:BIT_PACKED,PLAIN,RLE


As you can see from the parquet meta the STA field is missing. And spark is
scanning all data of all files.

Any suggestion ?


Thanks //

RB


Parquet Filter PushDown

2017-03-30 Thread Rahul Nandi
Hi,
I have around 2 million data as parquet file in s3. The file structure is
somewhat like
id data
1 abc
2 cdf
3 fas
Now I want to filter and take the records where the id matches with my
required Id.

val requiredDataId = Array(1,2) //Might go upto 100s of records.

df.filter(requiredDataId.contains("id"))

This is my use case.

What will be best way to do this in spark 2.0.1 where I can also pushDown
the filter to parquet?



Thanks and Regards,
Rahul


Re: Expected benefit of parquet filter pushdown?

2016-09-01 Thread Christon DeWan
Thanks for the references, that explains a great deal. I can verify that using 
integer keys in this use case does work as expected w/r/t run time and bytes 
read. Hopefully this all works in the next spark release!

Thanks,
Xton

> On Aug 31, 2016, at 3:41 PM, Robert Kruszewski  wrote:
> 
> Your statistics seem corrupted. The creator filed doesn’t match the version 
> spec and as such parquet is not using it to filter. I would check whether you 
> have references to PARQUET-251 or PARQUET-297 in your executor logs. This bug 
> existed between parquet 1.5.0 and 1.8.0. Checkout 
> https://issues.apache.org/jira/browse/PARQUET-251. Only master of spark has 
> parquet >=1.8.0.
> 
> Also checkout VersionParser in parquet since your createdBy is invalid and 
> even if you have fixed parquet it will be deemed corrupted.
> 
> -  Robert
> 
> On 8/31/16, 10:29 PM, "cde...@apple.com on behalf of Christon DeWan" 
>  wrote:
> 
>I have a data set stored in parquet with several short key fields and one 
> relatively large (several kb) blob field. The data set is sorted by key1, 
> key2.
> 
>message spark_schema {
>  optional binary key1 (UTF8);
>  optional binary key2;
>  optional binary blob;
>}
> 
>One use case of this dataset is to fetch all the blobs for a given 
> predicate of key1, key2. I would expect parquet predicate pushdown to help 
> greatly by not reading blobs from rowgroups where the predicate on the keys 
> matched zero records. That does not appear to be the case, however.
> 
>For a predicate that only returns 2 rows (out of 6 million), this query:
> 
>   select sum(length(key2)) from t2 where key1 = 'rare value'
> 
>takes 5x longer and reads 50x more data (according to the web UI) than 
> this query:
> 
>   select sum(length(blob)) from t2 where key1 = 'rare value'
> 
>The parquet scan does appear to be getting the predicate (says explain(), 
> see below), and those columns do even appear to be dictionary encoded (see 
> further below).
> 
>So does filter pushdown not actually allow us to read less data or is 
> there something wrong with my setup?
> 
>Thanks,
>Xton
> 
>
> 
>scala> spark.sql("select sum(length(blob)) from t2 where key1 = 'rare 
> value'").explain()
>== Physical Plan ==
>*HashAggregate(keys=[], functions=[sum(cast(length(blob#48) as bigint))])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_sum(cast(length(blob#48) 
> as bigint))])
>  +- *Project [blob#48]
> +- *Filter (isnotnull(key1#46) && (key1#46 = rare value))
>+- *BatchedScan parquet [key1#46,blob#48] Format: 
> ParquetFormat, InputPaths: hdfs://nameservice1/user/me/parquet_test/blob, 
> PushedFilters: [IsNotNull(key1), EqualTo(key1,rare value)], ReadSchema: 
> struct
> 
> 
> 
>$ parquet-tools meta example.snappy.parquet 
>creator: parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d) 
>extra:   org.apache.spark.sql.parquet.row.metadata = 
> {"type":"struct","fields":[{"name":"key1","type":"string","nullable":true,"metadata":{}},{"name":"key2","type":"binary","nullable":true,"metadata":{}},{"
>  [more]...
> 
>file schema: spark_schema 
>
> 
>key1:OPTIONAL BINARY O:UTF8 R:0 D:1
>key2:OPTIONAL BINARY R:0 D:1
>blob:OPTIONAL BINARY R:0 D:1
> 
>row group 1: RC:3971 TS:320593029 
>
> 
>key1: BINARY SNAPPY DO:0 FPO:4 SZ:84/80/0.95 VC:3971 
> ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
>key2: BINARY SNAPPY DO:0 FPO:88 SZ:49582/53233/1.07 VC:3971 
> ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
>blob: BINARY SNAPPY DO:0 FPO:49670 SZ:134006918/320539716/2.39 
> VC:3971 ENC:BIT_PACKED,RLE,PLAIN
>...
> 
> 
>-
>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Expected benefit of parquet filter pushdown?

2016-08-31 Thread Robert Kruszewski
Your statistics seem corrupted. The creator filed doesn’t match the version 
spec and as such parquet is not using it to filter. I would check whether you 
have references to PARQUET-251 or PARQUET-297 in your executor logs. This bug 
existed between parquet 1.5.0 and 1.8.0. Checkout 
https://issues.apache.org/jira/browse/PARQUET-251. Only master of spark has 
parquet >=1.8.0.

Also checkout VersionParser in parquet since your createdBy is invalid and even 
if you have fixed parquet it will be deemed corrupted.

-  Robert

On 8/31/16, 10:29 PM, "cde...@apple.com on behalf of Christon DeWan" 
 wrote:

I have a data set stored in parquet with several short key fields and one 
relatively large (several kb) blob field. The data set is sorted by key1, key2.

message spark_schema {
  optional binary key1 (UTF8);
  optional binary key2;
  optional binary blob;
}

One use case of this dataset is to fetch all the blobs for a given 
predicate of key1, key2. I would expect parquet predicate pushdown to help 
greatly by not reading blobs from rowgroups where the predicate on the keys 
matched zero records. That does not appear to be the case, however.

For a predicate that only returns 2 rows (out of 6 million), this query:

select sum(length(key2)) from t2 where key1 = 'rare value'

takes 5x longer and reads 50x more data (according to the web UI) than this 
query:

select sum(length(blob)) from t2 where key1 = 'rare value'

The parquet scan does appear to be getting the predicate (says explain(), 
see below), and those columns do even appear to be dictionary encoded (see 
further below).

So does filter pushdown not actually allow us to read less data or is there 
something wrong with my setup?

Thanks,
Xton



scala> spark.sql("select sum(length(blob)) from t2 where key1 = 'rare 
value'").explain()
== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(cast(length(blob#48) as bigint))])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_sum(cast(length(blob#48) 
as bigint))])
  +- *Project [blob#48]
 +- *Filter (isnotnull(key1#46) && (key1#46 = rare value))
+- *BatchedScan parquet [key1#46,blob#48] Format: 
ParquetFormat, InputPaths: hdfs://nameservice1/user/me/parquet_test/blob, 
PushedFilters: [IsNotNull(key1), EqualTo(key1,rare value)], ReadSchema: 
struct



$ parquet-tools meta example.snappy.parquet 
creator: parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d) 
extra:   org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"key1","type":"string","nullable":true,"metadata":{}},{"name":"key2","type":"binary","nullable":true,"metadata":{}},{"
 [more]...

file schema: spark_schema 


key1:OPTIONAL BINARY O:UTF8 R:0 D:1
key2:OPTIONAL BINARY R:0 D:1
blob:OPTIONAL BINARY R:0 D:1

row group 1: RC:3971 TS:320593029 


key1: BINARY SNAPPY DO:0 FPO:4 SZ:84/80/0.95 VC:3971 
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
key2: BINARY SNAPPY DO:0 FPO:88 SZ:49582/53233/1.07 VC:3971 
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
blob: BINARY SNAPPY DO:0 FPO:49670 SZ:134006918/320539716/2.39 
VC:3971 ENC:BIT_PACKED,RLE,PLAIN
...


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




smime.p7s
Description: S/MIME cryptographic signature


Expected benefit of parquet filter pushdown?

2016-08-31 Thread Christon DeWan
I have a data set stored in parquet with several short key fields and one 
relatively large (several kb) blob field. The data set is sorted by key1, key2.

message spark_schema {
  optional binary key1 (UTF8);
  optional binary key2;
  optional binary blob;
}

One use case of this dataset is to fetch all the blobs for a given predicate of 
key1, key2. I would expect parquet predicate pushdown to help greatly by not 
reading blobs from rowgroups where the predicate on the keys matched zero 
records. That does not appear to be the case, however.

For a predicate that only returns 2 rows (out of 6 million), this query:

select sum(length(key2)) from t2 where key1 = 'rare value'

takes 5x longer and reads 50x more data (according to the web UI) than this 
query:

select sum(length(blob)) from t2 where key1 = 'rare value'

The parquet scan does appear to be getting the predicate (says explain(), see 
below), and those columns do even appear to be dictionary encoded (see further 
below).

So does filter pushdown not actually allow us to read less data or is there 
something wrong with my setup?

Thanks,
Xton



scala> spark.sql("select sum(length(blob)) from t2 where key1 = 'rare 
value'").explain()
== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(cast(length(blob#48) as bigint))])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_sum(cast(length(blob#48) as 
bigint))])
  +- *Project [blob#48]
 +- *Filter (isnotnull(key1#46) && (key1#46 = rare value))
+- *BatchedScan parquet [key1#46,blob#48] Format: ParquetFormat, 
InputPaths: hdfs://nameservice1/user/me/parquet_test/blob, PushedFilters: 
[IsNotNull(key1), EqualTo(key1,rare value)], ReadSchema: 
struct



$ parquet-tools meta example.snappy.parquet 
creator: parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d) 
extra:   org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"key1","type":"string","nullable":true,"metadata":{}},{"name":"key2","type":"binary","nullable":true,"metadata":{}},{"
 [more]...

file schema: spark_schema 

key1:OPTIONAL BINARY O:UTF8 R:0 D:1
key2:OPTIONAL BINARY R:0 D:1
blob:OPTIONAL BINARY R:0 D:1

row group 1: RC:3971 TS:320593029 

key1: BINARY SNAPPY DO:0 FPO:4 SZ:84/80/0.95 VC:3971 
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
key2: BINARY SNAPPY DO:0 FPO:88 SZ:49582/53233/1.07 VC:3971 
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
blob: BINARY SNAPPY DO:0 FPO:49670 SZ:134006918/320539716/2.39 VC:3971 
ENC:BIT_PACKED,RLE,PLAIN
...


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org