Unfortunately, Spark doesn’t push down filters for nested columns. I remember an effort to implement it [1]. However, it is not merged. So, even if we have proper statistics in Iceberg, we cannot leverage it from Spark.
[1] - https://github.com/apache/spark/pull/22573 <https://github.com/apache/spark/pull/22573> > On 26 Feb 2019, at 16:52, Gautam <gautamkows...@gmail.com> wrote: > > Thanks Anton, this is very helpful! I will apply the patch from pull#63 and > give it a shot. > > Re: Collecting min/max stas on nested structures ( > https://github.com/apache/incubator-iceberg/issues/78 > <https://github.com/apache/incubator-iceberg/issues/78> ) ... > > We have the exact same use case for skipping files on nested field filters. I > was intrigued by your comment on enabling stats on nested structures by > replacing `fileSchema.asStruct().field(fieldId)` with > `fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` .. Have you > had success with this? If so, I can try it out on our data as well. > > > > > > On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi <aokolnyc...@apple.com > <mailto:aokolnyc...@apple.com>> wrote: > Hi Gautam, > > I believe you see this behaviour because SparkAppenderFactory is configured > to use ParquetWriteAdapter. It only tracks the number of records and uses > ParquetWriteSupport from Spark. This means that the statistics is not > collected on writes and cannot be used on reads. > > Once [1] is merged, proper statistics will be fetched from the footer and > persisted in the manifests. The statistics is collected when writing data > files not manifests. See [2] for more info. Also, [3] contains an example > that filters out files (it requires [1] to be cherry-picked locally). > > Hope that helps, > Anton > > [1] - https://github.com/apache/incubator-iceberg/pull/63 > <https://github.com/apache/incubator-iceberg/pull/63> > [2] - > https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java > > <https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java> > [3] - https://github.com/apache/incubator-iceberg/pull/105 > <https://github.com/apache/incubator-iceberg/pull/105> > > >> On 26 Feb 2019, at 13:58, Gautam <gautamkows...@gmail.com >> <mailto:gautamkows...@gmail.com>> wrote: >> >> .. Just to be clear my concern is around Iceberg not skipping files. Iceberg >> does skip rowGroups when scanning files as iceberg.parquet.ParquetReader >> uses the parquet stats under it while skipping, albeit none of these stats >> come from the manifests. >> >> On Tue, Feb 26, 2019 at 7:24 PM Gautam <gautamkows...@gmail.com >> <mailto:gautamkows...@gmail.com>> wrote: >> Hello Devs, >> I am looking into leveraging Iceberg to speed up split >> generation and to minimize file scans. My understanding was that Iceberg >> keeps key statistics as listed under Metrics.java [1] viz. column >> lower/upper bounds, nullValues, distinct value counts, etc. and that table >> scanning leverages these to skip partitions, files & row-groups (in the >> Parquet context). >> >> What I found is files aren't skipped when a predicate applies only to a >> subset of the table's files. Within a partition it will scan all files as >> manifests only keep record counts but the rest of the metrics (lower, upper, >> distinct value counts, null values) are null / empty. This is coz >> AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently >> that is the only appender supported for writing manifest files. >> >> >> Example : >> >> In following example iceTable was generated by iteratively adding two files >> so it has two separate parquet files under it .. >> >> scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl)) >> >> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet, >> partition_data=PartitionData{}, residual=true} >> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet, >> partition_data=PartitionData{}, residual=true} >> >> >> Only one file contains row with age = null .. >> >> scala> iceDf.show() >> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a task of >> very large size (113 KB). The maximum recommended task size is 100 KB. >> +----+-------+--------------------+ >> | age| name| friends| >> +----+-------+--------------------+ >> | 60| Kannan| [Justin -> 19]| >> | 75| Sharon|[Michael -> 30, J...| >> |null|Michael| null| >> | 30| Andy|[Josh -> 10, Bisw...| >> | 19| Justin|[Kannan -> 75, Sa...| >> +----+-------+--------------------+ >> >> >> >> Running filter on isNull(age) scans both files .. >> >> val isNullExp = Expressions.isNull("age") >> val isNullScan = iceTable.newScan().filter(isNullExp) >> >> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl)) >> >> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet, >> partition_data=PartitionData{}, residual=is_null(ref(name="age"))} >> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet, >> partition_data=PartitionData{}, residual=is_null(ref(name="age"))} >> >> >> >> I would expect only one file to be scanned as Iceberg should track >> nullValueCounts as per Metrics.java [1] .. The same issue holds for integer >> comparison filters scanning too many files. >> >> When I looked through the code, there is provision for using Parquet file >> footer stats to populate Manifest Metrics [3] but this is never used as >> Iceberg currently only allows AvroFileAppender for creating manifest files. >> >> What's the plan around using Parquet footer stats in manifests which can be >> very useful during split generation? I saw some discussions around this in >> the Iceberg Spec document [4] but couldn't glean if any of those are >> actually implemented yet. >> >> I can work on a proposal PR for adding these in but wanted to know the >> current thoughts around this. >> >> >> Gist for above example : >> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7 >> <https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7> >> >> >> Looking forward to your feedback, >> >> Cheers, >> -Gautam. >> >> >> >> >> >> [1] - >> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java >> >> <https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java> >> [2] - >> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56 >> >> <https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56> >> [3] - >> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118 >> >> <https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118> >> [4] - >> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit# >> >> <https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#> >> >