Raised https://github.com/apache/incubator-iceberg/issues/122 for the filtering support.
On Wed, Mar 6, 2019 at 1:34 AM Anton Okolnychyi <aokolnyc...@apple.com.invalid> wrote: > Sounds good, Gautam. > > Our intention was to be able to filter out files using predicates on > nested fields. For now, file skipping works only with predicates that > involve top level attributes. > > > On 5 Mar 2019, at 17:47, Gautam <gautamkows...@gmail.com> wrote: > > Hey Anton, > I'm curious how you are using the Struct metrics in your company, > are you planning to use it for predicate pushdowns or something else > entirely? > > Regarding timeline, that's fine, we can wait a week or two for your > changes on collecting metrics. If I can assume that your changes will add > the struct metrics, I could open a separate Iceberg issue about the struct > expression handling. If Ryan and you agree on allowing struct based > filtering in Iceberg as long as we avoid mixed filtering (map<struct<int>> > , array<struct<Int>> , etc.) I can go ahead and work on it. > > Cheers, > -Gautam. > > > > On Tue, Mar 5, 2019 at 10:30 PM Anton Okolnychyi < > aokolnyc...@apple.com.invalid> wrote: > >> Sorry for my late reply and thanks for testing Gautam! >> >> I had a local prototype that only collected metrics for nested structs >> and stored them. I haven’t checked if Iceberg can make use of that right >> now. As I understand Ryan’s comment and Gautam’s observations, we will need >> changes to make it work even if we have proper min/max statistics. So, we >> have two independent issues then. I was planning to add tests and submit >> the collection upstream. However, open source approval within my company >> might easily take another week or more. So, if we need this change earlier, >> someone can implement it. Just let me know, I can help to review then. >> >> Thanks, >> Anton >> >> >> On 5 Mar 2019, at 09:51, Gautam <gautamkows...@gmail.com> wrote: >> >> Thanks for the response Ryan, comments in line ... >> >> > Iceberg doesn't support binding expressions in sub-structs yet. So the >> fix on the Iceberg side requires a few steps. First, collecting the metrics >> from Parquet with Anton's PR, and second, updating expression binding to >> work with structs. >> >> I don't think there is a PR up yet on collecting metrics on struct >> fields, I could work on one if Anton isn't already on it (thanks for >> calling it out in the issue Anton!). >> >> > The reason why binding doesn't work with structs yet it that we don't >> want to bind structs that are within maps or arrays because those will >> change the semantics of the expression. For example, a.b = 5 can be run on >> a: struct<b: int> but can't be run on a: list<struct<b: int>>. >> >> From the discussion on said issue [1] seems like we are ok with structs >> being filtered on. About structs inside maps or arrays, can we not reject >> the invalid cases in the expression evaluation? As in, detect of what >> nested type field 'a' is and allow or disallow appropriately? Having >> support for just structs is a good incremental feature methinks. Especially >> coz, as Anton pointed out, Spark has a PR up [2] on pushing down >> struct-based filters which one can cherry pick locally. >> >> > Also, the Avro problem wasn't because the manifests are stored as Avro. >> Avro doesn't collect metrics about the data that is stored, but the >> manifests have the metrics that were added with each file, so the problem >> is not adding the metrics when you added the files. I think you've solved >> the problem and correctly built your table metadata using the metrics from >> the Parquet footers, but I still want to note the distinction: Avro >> manifests store metrics correctly. Avro data files don't generate metrics. >> >> Gotcha! >> >> Cheers, >> -Gautam. >> >> [1] - https://github.com/apache/incubator-iceberg/issues/78 >> [2] - https://github.com/apache/spark/pull/22573 >> >> >> On Sat, Mar 2, 2019 at 6:47 AM Ryan Blue <rb...@netflix.com.invalid> >> wrote: >> >>> Iceberg doesn't support binding expressions in sub-structs yet. So the >>> fix on the Iceberg side requires a few steps. First, collecting the metrics >>> from Parquet with Anton's PR, and second, updating expression binding to >>> work with structs. >>> >>> The reason why binding doesn't work with structs yet it that we don't >>> want to bind structs that are within maps or arrays because those will >>> change the semantics of the expression. For example, a.b = 5 can be run on >>> a: struct<b: int> but can't be run on a: list<struct<b: int>>. >>> >>> Also, the Avro problem wasn't because the manifests are stored as Avro. >>> Avro doesn't collect metrics about the data that is stored, but the >>> manifests have the metrics that were added with each file, so the problem >>> is not adding the metrics when you added the files. I think you've solved >>> the problem and correctly built your table metadata using the metrics from >>> the Parquet footers, but I still want to note the distinction: Avro >>> manifests store metrics correctly. Avro data files don't generate metrics. >>> >>> On Thu, Feb 28, 2019 at 1:32 AM Gautam <gautamkows...@gmail.com> wrote: >>> >>>> Hey Anton, >>>> Wanted to circle back on the Spark PR [1] to add support for >>>> nested fields .. I tried applying it, tested it. With this change Spark >>>> pushes filters on structs down to Iceberg, but Iceberg expression handling >>>> seems to fail in validation .. >>>> >>>> >>>> Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot >>>> find field 'location.lat' in struct: struct<1: age: optional int, 2: name: >>>> optional string, 3: friends: optional map<string, int>, 4: location: >>>> optional struct<7: lat: optional double, 8: lon: optional double>> >>>> at >>>> com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42) >>>> at >>>> com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76) >>>> at >>>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138) >>>> at >>>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94) >>>> at >>>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147) >>>> at >>>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160) >>>> at >>>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108) >>>> at >>>> com.netflix.iceberg.expressions.InclusiveManifestEvaluator.<init>(InclusiveManifestEvaluator.java:57) >>>> at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153) >>>> at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149) >>>> at >>>> com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) >>>> at >>>> com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) >>>> at >>>> com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) >>>> at >>>> com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257) >>>> >>>> >>>> I think this should be handled in Iceberg as struct filters like a.b.c >>>> = "blah" is a legit way to query in SQL. If you feel this is a valid >>>> assumption I can work on a fix. Thoughts? >>>> >>>> >>>> *Test Table Schema:* >>>> scala> iceDf.printSchema >>>> root >>>> |-- age: integer (nullable = true) >>>> |-- name: string (nullable = true) >>>> |-- friends: map (nullable = true) >>>> | |-- key: string >>>> | |-- value: integer (valueContainsNull = true) >>>> |-- location: struct (nullable = true) >>>> | |-- lat: double (nullable = true) >>>> | |-- lon: double (nullable = true) >>>> >>>> >>>> *Gist to recreate issue:* >>>> https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac >>>> >>>> >>>> Cheers, >>>> -Gautam. >>>> >>>> >>>> >>>> [1] - https://github.com/apache/spark/pull/22573 >>>> >>>> On Tue, Feb 26, 2019 at 10:35 PM Anton Okolnychyi < >>>> aokolnyc...@apple.com> wrote: >>>> >>>>> Unfortunately, Spark doesn’t push down filters for nested columns. I >>>>> remember an effort to implement it [1]. However, it is not merged. >>>>> So, even if we have proper statistics in Iceberg, we cannot leverage >>>>> it from Spark. >>>>> >>>>> [1] - https://github.com/apache/spark/pull/22573 >>>>> >>>>> >>>>> On 26 Feb 2019, at 16:52, Gautam <gautamkows...@gmail.com> wrote: >>>>> >>>>> Thanks Anton, this is very helpful! I will apply the patch from >>>>> pull#63 and give it a shot. >>>>> >>>>> Re: Collecting min/max stas on nested structures ( >>>>> *https://github.com/apache/incubator-iceberg/issues/78 >>>>> <https://github.com/apache/incubator-iceberg/issues/78>* ) ... >>>>> >>>>> We have the exact same use case for skipping files on nested field >>>>> filters. I was intrigued by your comment on enabling stats on nested >>>>> structures by replacing `fileSchema.asStruct().field(fieldId)` with ` >>>>> fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` .. >>>>> Have you had success with this? If so, I can try it out on our data as >>>>> well. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi < >>>>> aokolnyc...@apple.com> wrote: >>>>> >>>>>> Hi Gautam, >>>>>> >>>>>> I believe you see this behaviour because SparkAppenderFactory is >>>>>> configured to use ParquetWriteAdapter. It only tracks the number of >>>>>> records >>>>>> and uses ParquetWriteSupport from Spark. This means that the statistics >>>>>> is >>>>>> not collected on writes and cannot be used on reads. >>>>>> >>>>>> Once [1] is merged, proper statistics will be fetched from the footer >>>>>> and persisted in the manifests. The statistics is collected when writing >>>>>> data files not manifests. See [2] for more info. Also, [3] contains an >>>>>> example that filters out files (it requires [1] to be cherry-picked >>>>>> locally). >>>>>> >>>>>> Hope that helps, >>>>>> Anton >>>>>> >>>>>> [1] - https://github.com/apache/incubator-iceberg/pull/63 >>>>>> [2] - >>>>>> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java >>>>>> [3] - https://github.com/apache/incubator-iceberg/pull/105 >>>>>> >>>>>> >>>>>> On 26 Feb 2019, at 13:58, Gautam <gautamkows...@gmail.com> wrote: >>>>>> >>>>>> .. Just to be clear my concern is around Iceberg not skipping files. >>>>>> Iceberg does skip rowGroups when scanning files as >>>>>> *iceberg.parquet.ParquetReader* uses the parquet stats under it >>>>>> while skipping, albeit none of these stats come from the manifests. >>>>>> >>>>>> On Tue, Feb 26, 2019 at 7:24 PM Gautam <gautamkows...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello Devs, >>>>>>> I am looking into leveraging Iceberg to speed up >>>>>>> split generation and to minimize file scans. My understanding was that >>>>>>> Iceberg keeps key statistics as listed under Metrics.java [1] viz. >>>>>>> column >>>>>>> lower/upper bounds, nullValues, distinct value counts, etc. and that >>>>>>> table >>>>>>> scanning leverages these to skip partitions, files & row-groups (in the >>>>>>> Parquet context). >>>>>>> >>>>>>> What I found is files aren't skipped when a predicate applies only >>>>>>> to a subset of the table's files. Within a partition it will scan all >>>>>>> files >>>>>>> as manifests only keep record counts but the rest of the metrics (lower, >>>>>>> upper, distinct value counts, null values) are null / empty. This is coz >>>>>>> AvroFileAppender only keeps `recordCounts` as metrics [2].. And >>>>>>> currently >>>>>>> that is the only appender supported for writing manifest files. >>>>>>> >>>>>>> >>>>>>> *Example :* >>>>>>> >>>>>>> In following example iceTable was generated by iteratively adding >>>>>>> two files so it has two separate parquet files under it .. >>>>>>> >>>>>>> scala> iceTable.newScan().planFiles.asScala.foreach(fl => >>>>>>> println(fl)) >>>>>>> >>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet, >>>>>>> partition_data=PartitionData{}, residual=true} >>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet, >>>>>>> partition_data=PartitionData{}, residual=true} >>>>>>> >>>>>>> >>>>>>> *Only one file contains row with age = null .. * >>>>>>> >>>>>>> scala> iceDf.show() >>>>>>> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a >>>>>>> task of very large size (113 KB). The maximum recommended task size is >>>>>>> 100 >>>>>>> KB. >>>>>>> +----+-------+--------------------+ >>>>>>> | age| name| friends| >>>>>>> +----+-------+--------------------+ >>>>>>> | 60| Kannan| [Justin -> 19]| >>>>>>> | 75| Sharon|[Michael -> 30, J...| >>>>>>> |null|Michael| null| >>>>>>> | 30| Andy|[Josh -> 10, Bisw...| >>>>>>> | 19| Justin|[Kannan -> 75, Sa...| >>>>>>> +----+-------+--------------------+ >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Running filter on isNull(age) scans both files .. * >>>>>>> >>>>>>> val isNullExp = Expressions.isNull("age") >>>>>>> val isNullScan = iceTable.newScan().filter(isNullExp) >>>>>>> >>>>>>> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl)) >>>>>>> >>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet, >>>>>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))} >>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet, >>>>>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))} >>>>>>> >>>>>>> >>>>>>> >>>>>>> I would expect only one file to be scanned as Iceberg should track >>>>>>> nullValueCounts as per Metrics.java [1] .. The same issue holds for >>>>>>> integer >>>>>>> comparison filters scanning too many files. >>>>>>> >>>>>>> When I looked through the code, there is provision for using Parquet >>>>>>> file footer stats to populate Manifest Metrics [3] but this is never >>>>>>> used >>>>>>> as Iceberg currently only allows AvroFileAppender for creating manifest >>>>>>> files. >>>>>>> >>>>>>> What's the plan around using Parquet footer stats in manifests which >>>>>>> can be very useful during split generation? I saw some discussions >>>>>>> around >>>>>>> this in the Iceberg Spec document [4] but couldn't glean if any of those >>>>>>> are actually implemented yet. >>>>>>> >>>>>>> I can work on a proposal PR for adding these in but wanted to know >>>>>>> the current thoughts around this. >>>>>>> >>>>>>> >>>>>>> *Gist for above example *: >>>>>>> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7 >>>>>>> >>>>>>> >>>>>>> Looking forward to your feedback, >>>>>>> >>>>>>> Cheers, >>>>>>> -Gautam. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> [1] - >>>>>>> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java >>>>>>> [2] - >>>>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56 >>>>>>> [3] - >>>>>>> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118 >>>>>>> [4] - >>>>>>> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit# >>>>>>> >>>>>>> >>>>>> >>>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> >> >