Your statistics seem corrupted. The creator filed doesn’t match the version 
spec and as such parquet is not using it to filter. I would check whether you 
have references to PARQUET-251 or PARQUET-297 in your executor logs. This bug 
existed between parquet 1.5.0 and 1.8.0. Checkout 
https://issues.apache.org/jira/browse/PARQUET-251. Only master of spark has 
parquet >=1.8.0.

Also checkout VersionParser in parquet since your createdBy is invalid and even 
if you have fixed parquet it will be deemed corrupted.

-          Robert

On 8/31/16, 10:29 PM, "cde...@apple.com on behalf of Christon DeWan" 
<cde...@apple.com> wrote:

    I have a data set stored in parquet with several short key fields and one 
relatively large (several kb) blob field. The data set is sorted by key1, key2.
    
        message spark_schema {
          optional binary key1 (UTF8);
          optional binary key2;
          optional binary blob;
        }
    
    One use case of this dataset is to fetch all the blobs for a given 
predicate of key1, key2. I would expect parquet predicate pushdown to help 
greatly by not reading blobs from rowgroups where the predicate on the keys 
matched zero records. That does not appear to be the case, however.
    
    For a predicate that only returns 2 rows (out of 6 million), this query:
    
        select sum(length(key2)) from t2 where key1 = 'rare value'
    
    takes 5x longer and reads 50x more data (according to the web UI) than this 
query:
    
        select sum(length(blob)) from t2 where key1 = 'rare value'
    
    The parquet scan does appear to be getting the predicate (says explain(), 
see below), and those columns do even appear to be dictionary encoded (see 
further below).
    
    So does filter pushdown not actually allow us to read less data or is there 
something wrong with my setup?
    
    Thanks,
    Xton
    
    --------
    
    scala> spark.sql("select sum(length(blob)) from t2 where key1 = 'rare 
value'").explain()
    == Physical Plan ==
    *HashAggregate(keys=[], functions=[sum(cast(length(blob#48) as bigint))])
    +- Exchange SinglePartition
       +- *HashAggregate(keys=[], functions=[partial_sum(cast(length(blob#48) 
as bigint))])
          +- *Project [blob#48]
             +- *Filter (isnotnull(key1#46) && (key1#46 = rare value))
                +- *BatchedScan parquet [key1#46,blob#48] Format: 
ParquetFormat, InputPaths: hdfs://nameservice1/user/me/parquet_test/blob, 
PushedFilters: [IsNotNull(key1), EqualTo(key1,rare value)], ReadSchema: 
struct<key1:string,blob:binary>
    
    
    
    $ parquet-tools meta example.snappy.parquet 
    creator:     parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d) 
    extra:       org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"key1","type":"string","nullable":true,"metadata":{}},{"name":"key2","type":"binary","nullable":true,"metadata":{}},{"
 [more]...
    
    file schema: spark_schema 
    
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    key1:        OPTIONAL BINARY O:UTF8 R:0 D:1
    key2:        OPTIONAL BINARY R:0 D:1
    blob:        OPTIONAL BINARY R:0 D:1
    
    row group 1: RC:3971 TS:320593029 
    
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    key1:         BINARY SNAPPY DO:0 FPO:4 SZ:84/80/0.95 VC:3971 
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
    key2:         BINARY SNAPPY DO:0 FPO:88 SZ:49582/53233/1.07 VC:3971 
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
    blob:         BINARY SNAPPY DO:0 FPO:49670 SZ:134006918/320539716/2.39 
VC:3971 ENC:BIT_PACKED,RLE,PLAIN
    ...
    
    
    ---------------------------------------------------------------------
    To unsubscribe e-mail: user-unsubscr...@spark.apache.org
    
    

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to