Your statistics seem corrupted. The creator filed doesn’t match the version spec and as such parquet is not using it to filter. I would check whether you have references to PARQUET-251 or PARQUET-297 in your executor logs. This bug existed between parquet 1.5.0 and 1.8.0. Checkout https://issues.apache.org/jira/browse/PARQUET-251. Only master of spark has parquet >=1.8.0.
Also checkout VersionParser in parquet since your createdBy is invalid and even if you have fixed parquet it will be deemed corrupted. - Robert On 8/31/16, 10:29 PM, "cde...@apple.com on behalf of Christon DeWan" <cde...@apple.com> wrote: I have a data set stored in parquet with several short key fields and one relatively large (several kb) blob field. The data set is sorted by key1, key2. message spark_schema { optional binary key1 (UTF8); optional binary key2; optional binary blob; } One use case of this dataset is to fetch all the blobs for a given predicate of key1, key2. I would expect parquet predicate pushdown to help greatly by not reading blobs from rowgroups where the predicate on the keys matched zero records. That does not appear to be the case, however. For a predicate that only returns 2 rows (out of 6 million), this query: select sum(length(key2)) from t2 where key1 = 'rare value' takes 5x longer and reads 50x more data (according to the web UI) than this query: select sum(length(blob)) from t2 where key1 = 'rare value' The parquet scan does appear to be getting the predicate (says explain(), see below), and those columns do even appear to be dictionary encoded (see further below). So does filter pushdown not actually allow us to read less data or is there something wrong with my setup? Thanks, Xton -------- scala> spark.sql("select sum(length(blob)) from t2 where key1 = 'rare value'").explain() == Physical Plan == *HashAggregate(keys=[], functions=[sum(cast(length(blob#48) as bigint))]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_sum(cast(length(blob#48) as bigint))]) +- *Project [blob#48] +- *Filter (isnotnull(key1#46) && (key1#46 = rare value)) +- *BatchedScan parquet [key1#46,blob#48] Format: ParquetFormat, InputPaths: hdfs://nameservice1/user/me/parquet_test/blob, PushedFilters: [IsNotNull(key1), EqualTo(key1,rare value)], ReadSchema: struct<key1:string,blob:binary> $ parquet-tools meta example.snappy.parquet creator: parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d) extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"key1","type":"string","nullable":true,"metadata":{}},{"name":"key2","type":"binary","nullable":true,"metadata":{}},{" [more]... file schema: spark_schema ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- key1: OPTIONAL BINARY O:UTF8 R:0 D:1 key2: OPTIONAL BINARY R:0 D:1 blob: OPTIONAL BINARY R:0 D:1 row group 1: RC:3971 TS:320593029 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- key1: BINARY SNAPPY DO:0 FPO:4 SZ:84/80/0.95 VC:3971 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE key2: BINARY SNAPPY DO:0 FPO:88 SZ:49582/53233/1.07 VC:3971 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE blob: BINARY SNAPPY DO:0 FPO:49670 SZ:134006918/320539716/2.39 VC:3971 ENC:BIT_PACKED,RLE,PLAIN ... --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
smime.p7s
Description: S/MIME cryptographic signature