Unfortunately, Spark doesn’t push down filters for nested columns. I remember 
an effort to implement it [1]. However, it is not merged.
So, even if we have proper statistics in Iceberg, we cannot leverage it from 
Spark.

[1] - https://github.com/apache/spark/pull/22573 
<https://github.com/apache/spark/pull/22573>


> On 26 Feb 2019, at 16:52, Gautam <gautamkows...@gmail.com> wrote:
> 
> Thanks Anton, this is very helpful!  I will apply the patch from pull#63 and 
> give it a shot.
> 
> Re: Collecting min/max stas on nested structures ( 
> https://github.com/apache/incubator-iceberg/issues/78 
> <https://github.com/apache/incubator-iceberg/issues/78> ) ...
> 
> We have the exact same use case for skipping files on nested field filters. I 
> was intrigued by your comment on enabling stats on nested structures by 
> replacing `fileSchema.asStruct().field(fieldId)` with 
> `fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` .. Have you 
> had success with this? If so, I can try it out on our data as well. 
> 
> 
> 
> 
> 
> On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi <aokolnyc...@apple.com 
> <mailto:aokolnyc...@apple.com>> wrote:
> Hi Gautam,
> 
> I believe you see this behaviour because SparkAppenderFactory is configured 
> to use ParquetWriteAdapter. It only tracks the number of records and uses 
> ParquetWriteSupport from Spark. This means that the statistics is not 
> collected on writes and cannot be used on reads.
> 
> Once [1] is merged, proper statistics will be fetched from the footer and 
> persisted in the manifests. The statistics is collected when writing data 
> files not manifests. See [2] for more info. Also, [3] contains an example 
> that filters out files (it requires [1] to be cherry-picked locally).
> 
> Hope that helps,
> Anton
> 
> [1] - https://github.com/apache/incubator-iceberg/pull/63 
> <https://github.com/apache/incubator-iceberg/pull/63>
> [2] - 
> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
>  
> <https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java>
> [3] - https://github.com/apache/incubator-iceberg/pull/105 
> <https://github.com/apache/incubator-iceberg/pull/105>
> 
> 
>> On 26 Feb 2019, at 13:58, Gautam <gautamkows...@gmail.com 
>> <mailto:gautamkows...@gmail.com>> wrote:
>> 
>> .. Just to be clear my concern is around Iceberg not skipping files. Iceberg 
>> does skip rowGroups when scanning files as iceberg.parquet.ParquetReader 
>> uses the parquet stats under it while skipping, albeit none of these stats 
>> come from the manifests.
>> 
>> On Tue, Feb 26, 2019 at 7:24 PM Gautam <gautamkows...@gmail.com 
>> <mailto:gautamkows...@gmail.com>> wrote:
>> Hello Devs,
>>                    I am looking into leveraging Iceberg to speed up split 
>> generation and to minimize file scans. My understanding was that Iceberg 
>> keeps key statistics as listed under Metrics.java [1] viz. column 
>> lower/upper bounds, nullValues, distinct value counts, etc. and that table 
>> scanning leverages these to skip partitions, files & row-groups (in the 
>> Parquet context). 
>> 
>> What I found is files aren't skipped when a predicate applies only to a 
>> subset of the table's files. Within a partition it will scan all files as 
>> manifests only keep record counts but the rest of the metrics (lower, upper, 
>> distinct value counts, null values) are null / empty. This is coz 
>> AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently 
>> that is the only appender supported for writing manifest files.
>> 
>> 
>> Example :
>> 
>> In following example iceTable was generated by iteratively adding two files 
>> so it has two separate parquet files under it ..
>> 
>> scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl))
>> 
>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>  partition_data=PartitionData{}, residual=true}
>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>  partition_data=PartitionData{}, residual=true}
>> 
>> 
>> Only one file contains row with age = null .. 
>> 
>> scala> iceDf.show()
>> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a task of 
>> very large size (113 KB). The maximum recommended task size is 100 KB.
>> +----+-------+--------------------+
>> | age|   name|             friends|
>> +----+-------+--------------------+
>> |  60| Kannan|      [Justin -> 19]|
>> |  75| Sharon|[Michael -> 30, J...|
>> |null|Michael|                null|
>> |  30|   Andy|[Josh -> 10, Bisw...|
>> |  19| Justin|[Kannan -> 75, Sa...|
>> +----+-------+--------------------+
>> 
>> 
>> 
>> Running filter on isNull(age) scans both files .. 
>> 
>> val isNullExp = Expressions.isNull("age")
>> val isNullScan = iceTable.newScan().filter(isNullExp)
>> 
>> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))
>> 
>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>  partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>  partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>> 
>> 
>> 
>> I would expect only one file to be scanned as Iceberg should track 
>> nullValueCounts as per Metrics.java [1] .. The same issue holds for integer 
>> comparison filters scanning too many files. 
>> 
>> When I looked through the code, there is provision for using Parquet file 
>> footer stats to populate Manifest Metrics [3] but this is never used as 
>> Iceberg currently only allows AvroFileAppender for creating manifest files. 
>> 
>> What's the plan around using Parquet footer stats in manifests which can be 
>> very useful during split generation? I saw some discussions around this in 
>> the Iceberg Spec document [4] but couldn't glean if any of those are 
>> actually implemented yet. 
>> 
>> I can work on a proposal PR for adding these in but wanted to  know the 
>> current thoughts around this. 
>> 
>> 
>> Gist for above example : 
>> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7 
>> <https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7>
>> 
>> 
>> Looking forward to your feedback,
>> 
>> Cheers,
>> -Gautam.
>> 
>> 
>> 
>> 
>> 
>> [1] - 
>> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java
>>  
>> <https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java>
>> [2] - 
>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56
>>  
>> <https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56>
>> [3] - 
>> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118
>>  
>> <https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118>
>> [4] - 
>> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#
>>  
>> <https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#>
>> 
> 

Reply via email to