Thanks for the references, that explains a great deal. I can verify that using 
integer keys in this use case does work as expected w/r/t run time and bytes 
read. Hopefully this all works in the next spark release!

Thanks,
Xton

> On Aug 31, 2016, at 3:41 PM, Robert Kruszewski <robe...@palantir.com> wrote:
> 
> Your statistics seem corrupted. The creator filed doesn’t match the version 
> spec and as such parquet is not using it to filter. I would check whether you 
> have references to PARQUET-251 or PARQUET-297 in your executor logs. This bug 
> existed between parquet 1.5.0 and 1.8.0. Checkout 
> https://issues.apache.org/jira/browse/PARQUET-251. Only master of spark has 
> parquet >=1.8.0.
> 
> Also checkout VersionParser in parquet since your createdBy is invalid and 
> even if you have fixed parquet it will be deemed corrupted.
> 
> -          Robert
> 
> On 8/31/16, 10:29 PM, "cde...@apple.com on behalf of Christon DeWan" 
> <cde...@apple.com> wrote:
> 
>    I have a data set stored in parquet with several short key fields and one 
> relatively large (several kb) blob field. The data set is sorted by key1, 
> key2.
> 
>        message spark_schema {
>          optional binary key1 (UTF8);
>          optional binary key2;
>          optional binary blob;
>        }
> 
>    One use case of this dataset is to fetch all the blobs for a given 
> predicate of key1, key2. I would expect parquet predicate pushdown to help 
> greatly by not reading blobs from rowgroups where the predicate on the keys 
> matched zero records. That does not appear to be the case, however.
> 
>    For a predicate that only returns 2 rows (out of 6 million), this query:
> 
>       select sum(length(key2)) from t2 where key1 = 'rare value'
> 
>    takes 5x longer and reads 50x more data (according to the web UI) than 
> this query:
> 
>       select sum(length(blob)) from t2 where key1 = 'rare value'
> 
>    The parquet scan does appear to be getting the predicate (says explain(), 
> see below), and those columns do even appear to be dictionary encoded (see 
> further below).
> 
>    So does filter pushdown not actually allow us to read less data or is 
> there something wrong with my setup?
> 
>    Thanks,
>    Xton
> 
>    --------
> 
>    scala> spark.sql("select sum(length(blob)) from t2 where key1 = 'rare 
> value'").explain()
>    == Physical Plan ==
>    *HashAggregate(keys=[], functions=[sum(cast(length(blob#48) as bigint))])
>    +- Exchange SinglePartition
>       +- *HashAggregate(keys=[], functions=[partial_sum(cast(length(blob#48) 
> as bigint))])
>          +- *Project [blob#48]
>             +- *Filter (isnotnull(key1#46) && (key1#46 = rare value))
>                +- *BatchedScan parquet [key1#46,blob#48] Format: 
> ParquetFormat, InputPaths: hdfs://nameservice1/user/me/parquet_test/blob, 
> PushedFilters: [IsNotNull(key1), EqualTo(key1,rare value)], ReadSchema: 
> struct<key1:string,blob:binary>
> 
> 
> 
>    $ parquet-tools meta example.snappy.parquet 
>    creator:     parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d) 
>    extra:       org.apache.spark.sql.parquet.row.metadata = 
> {"type":"struct","fields":[{"name":"key1","type":"string","nullable":true,"metadata":{}},{"name":"key2","type":"binary","nullable":true,"metadata":{}},{"
>  [more]...
> 
>    file schema: spark_schema 
>    
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>    key1:        OPTIONAL BINARY O:UTF8 R:0 D:1
>    key2:        OPTIONAL BINARY R:0 D:1
>    blob:        OPTIONAL BINARY R:0 D:1
> 
>    row group 1: RC:3971 TS:320593029 
>    
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>    key1:         BINARY SNAPPY DO:0 FPO:4 SZ:84/80/0.95 VC:3971 
> ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
>    key2:         BINARY SNAPPY DO:0 FPO:88 SZ:49582/53233/1.07 VC:3971 
> ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
>    blob:         BINARY SNAPPY DO:0 FPO:49670 SZ:134006918/320539716/2.39 
> VC:3971 ENC:BIT_PACKED,RLE,PLAIN
>    ...
> 
> 
>    ---------------------------------------------------------------------
>    To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to