Raised https://github.com/apache/incubator-iceberg/issues/122 for the
filtering support.

On Wed, Mar 6, 2019 at 1:34 AM Anton Okolnychyi
<aokolnyc...@apple.com.invalid> wrote:

> Sounds good, Gautam.
>
> Our intention was to be able to filter out files using predicates on
> nested fields. For now, file skipping works only with predicates that
> involve top level attributes.
>
>
> On 5 Mar 2019, at 17:47, Gautam <gautamkows...@gmail.com> wrote:
>
> Hey Anton,
>        I'm curious how you are using the Struct metrics in your company,
> are you planning to use it for predicate pushdowns or something else
> entirely?
>
> Regarding timeline, that's fine, we can wait a week or two for your
> changes on collecting metrics. If I can assume that your changes will add
> the struct metrics, I could open a separate Iceberg issue about the struct
> expression handling. If Ryan and you agree on allowing struct based
> filtering in Iceberg as long as we avoid mixed filtering (map<struct<int>>
> , array<struct<Int>> , etc.) I can go ahead and work on it.
>
> Cheers,
> -Gautam.
>
>
>
> On Tue, Mar 5, 2019 at 10:30 PM Anton Okolnychyi <
> aokolnyc...@apple.com.invalid> wrote:
>
>> Sorry for my late reply and thanks for testing Gautam!
>>
>> I had a local prototype that only collected metrics for nested structs
>> and stored them. I haven’t checked if Iceberg can make use of that right
>> now. As I understand Ryan’s comment and Gautam’s observations, we will need
>> changes to make it work even if we have proper min/max statistics. So, we
>> have two independent issues then. I was planning to add tests and submit
>> the collection upstream. However, open source approval within my company
>> might easily take another week or more. So, if we need this change earlier,
>> someone can implement it. Just let me know, I can help to review then.
>>
>> Thanks,
>> Anton
>>
>>
>> On 5 Mar 2019, at 09:51, Gautam <gautamkows...@gmail.com> wrote:
>>
>> Thanks for the response Ryan, comments in line ...
>>
>> > Iceberg doesn't support binding expressions in sub-structs yet. So the
>> fix on the Iceberg side requires a few steps. First, collecting the metrics
>> from Parquet with Anton's PR, and second, updating expression binding to
>> work with structs.
>>
>> I don't think there is a PR up yet on collecting metrics on struct
>> fields, I could work on one if Anton isn't already on it (thanks for
>> calling it out in the issue Anton!).
>>
>> > The reason why binding doesn't work with structs yet it that we don't
>> want to bind structs that are within maps or arrays because those will
>> change the semantics of the expression. For example, a.b = 5 can be run on
>> a: struct<b: int> but can't be run on a: list<struct<b: int>>.
>>
>> From the discussion on said issue [1] seems like we are ok with structs
>> being filtered on. About structs inside maps or arrays, can we not reject
>> the invalid cases in the expression evaluation?  As in, detect of what
>> nested type field 'a' is and allow or disallow appropriately? Having
>> support for just structs is a good incremental feature methinks. Especially
>> coz, as Anton pointed out,  Spark has a PR up [2] on pushing down
>> struct-based filters which one can cherry pick locally.
>>
>> > Also, the Avro problem wasn't because the manifests are stored as Avro.
>> Avro doesn't collect metrics about the data that is stored, but the
>> manifests have the metrics that were added with each file, so the problem
>> is not adding the metrics when you added the files. I think you've solved
>> the problem and correctly built your table metadata using the metrics from
>> the Parquet footers, but I still want to note the distinction: Avro
>> manifests store metrics correctly. Avro data files don't generate metrics.
>>
>> Gotcha!
>>
>> Cheers,
>> -Gautam.
>>
>> [1] - https://github.com/apache/incubator-iceberg/issues/78
>> [2] - https://github.com/apache/spark/pull/22573
>>
>>
>> On Sat, Mar 2, 2019 at 6:47 AM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> Iceberg doesn't support binding expressions in sub-structs yet. So the
>>> fix on the Iceberg side requires a few steps. First, collecting the metrics
>>> from Parquet with Anton's PR, and second, updating expression binding to
>>> work with structs.
>>>
>>> The reason why binding doesn't work with structs yet it that we don't
>>> want to bind structs that are within maps or arrays because those will
>>> change the semantics of the expression. For example, a.b = 5 can be run on
>>> a: struct<b: int> but can't be run on a: list<struct<b: int>>.
>>>
>>> Also, the Avro problem wasn't because the manifests are stored as Avro.
>>> Avro doesn't collect metrics about the data that is stored, but the
>>> manifests have the metrics that were added with each file, so the problem
>>> is not adding the metrics when you added the files. I think you've solved
>>> the problem and correctly built your table metadata using the metrics from
>>> the Parquet footers, but I still want to note the distinction: Avro
>>> manifests store metrics correctly. Avro data files don't generate metrics.
>>>
>>> On Thu, Feb 28, 2019 at 1:32 AM Gautam <gautamkows...@gmail.com> wrote:
>>>
>>>> Hey Anton,
>>>>       Wanted to circle back on the Spark PR [1] to add support for
>>>> nested fields .. I tried applying it, tested it. With this change Spark
>>>> pushes filters on structs down to Iceberg, but Iceberg expression handling
>>>> seems to fail in validation ..
>>>>
>>>>
>>>> Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot
>>>> find field 'location.lat' in struct: struct<1: age: optional int, 2: name:
>>>> optional string, 3: friends: optional map<string, int>, 4: location:
>>>> optional struct<7: lat: optional double, 8: lon: optional double>>
>>>>   at
>>>> com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42)
>>>>   at
>>>> com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76)
>>>>   at
>>>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138)
>>>>   at
>>>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94)
>>>>   at
>>>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147)
>>>>   at
>>>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160)
>>>>   at
>>>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108)
>>>>   at
>>>> com.netflix.iceberg.expressions.InclusiveManifestEvaluator.<init>(InclusiveManifestEvaluator.java:57)
>>>>   at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153)
>>>>   at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149)
>>>>   at
>>>> com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>>>>   at
>>>> com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>>>>   at
>>>> com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>>>>   at
>>>> com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
>>>>
>>>>
>>>> I think this should be handled in Iceberg as  struct filters like a.b.c
>>>> = "blah" is a legit way to query in SQL. If you feel this is a valid
>>>> assumption I can work on a fix.   Thoughts?
>>>>
>>>>
>>>> *Test Table Schema:*
>>>> scala> iceDf.printSchema
>>>> root
>>>>  |-- age: integer (nullable = true)
>>>>  |-- name: string (nullable = true)
>>>>  |-- friends: map (nullable = true)
>>>>  |    |-- key: string
>>>>  |    |-- value: integer (valueContainsNull = true)
>>>>  |-- location: struct (nullable = true)
>>>>  |    |-- lat: double (nullable = true)
>>>>  |    |-- lon: double (nullable = true)
>>>>
>>>>
>>>> *Gist to recreate issue:*
>>>> https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac
>>>>
>>>>
>>>> Cheers,
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>> [1] -  https://github.com/apache/spark/pull/22573
>>>>
>>>> On Tue, Feb 26, 2019 at 10:35 PM Anton Okolnychyi <
>>>> aokolnyc...@apple.com> wrote:
>>>>
>>>>> Unfortunately, Spark doesn’t push down filters for nested columns. I
>>>>> remember an effort to implement it [1]. However, it is not merged.
>>>>> So, even if we have proper statistics in Iceberg, we cannot leverage
>>>>> it from Spark.
>>>>>
>>>>> [1] - https://github.com/apache/spark/pull/22573
>>>>>
>>>>>
>>>>> On 26 Feb 2019, at 16:52, Gautam <gautamkows...@gmail.com> wrote:
>>>>>
>>>>> Thanks Anton, this is very helpful!  I will apply the patch from
>>>>> pull#63 and give it a shot.
>>>>>
>>>>> Re: Collecting min/max stas on nested structures ( 
>>>>> *https://github.com/apache/incubator-iceberg/issues/78
>>>>> <https://github.com/apache/incubator-iceberg/issues/78>* ) ...
>>>>>
>>>>> We have the exact same use case for skipping files on nested field
>>>>> filters. I was intrigued by your comment on enabling stats on nested
>>>>> structures by replacing `fileSchema.asStruct().field(fieldId)` with `
>>>>> fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` ..
>>>>> Have you had success with this? If so, I can try it out on our data as
>>>>> well.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi <
>>>>> aokolnyc...@apple.com> wrote:
>>>>>
>>>>>> Hi Gautam,
>>>>>>
>>>>>> I believe you see this behaviour because SparkAppenderFactory is
>>>>>> configured to use ParquetWriteAdapter. It only tracks the number of 
>>>>>> records
>>>>>> and uses ParquetWriteSupport from Spark. This means that the statistics 
>>>>>> is
>>>>>> not collected on writes and cannot be used on reads.
>>>>>>
>>>>>> Once [1] is merged, proper statistics will be fetched from the footer
>>>>>> and persisted in the manifests. The statistics is collected when writing
>>>>>> data files not manifests. See [2] for more info. Also, [3] contains an
>>>>>> example that filters out files (it requires [1] to be cherry-picked
>>>>>> locally).
>>>>>>
>>>>>> Hope that helps,
>>>>>> Anton
>>>>>>
>>>>>> [1] - https://github.com/apache/incubator-iceberg/pull/63
>>>>>> [2] -
>>>>>> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
>>>>>> [3] - https://github.com/apache/incubator-iceberg/pull/105
>>>>>>
>>>>>>
>>>>>> On 26 Feb 2019, at 13:58, Gautam <gautamkows...@gmail.com> wrote:
>>>>>>
>>>>>> .. Just to be clear my concern is around Iceberg not skipping files.
>>>>>> Iceberg does skip rowGroups when scanning files as
>>>>>> *iceberg.parquet.ParquetReader* uses the parquet stats under it
>>>>>> while skipping, albeit none of these stats come from the manifests.
>>>>>>
>>>>>> On Tue, Feb 26, 2019 at 7:24 PM Gautam <gautamkows...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Devs,
>>>>>>>                    I am looking into leveraging Iceberg to speed up
>>>>>>> split generation and to minimize file scans. My understanding was that
>>>>>>> Iceberg keeps key statistics as listed under Metrics.java [1] viz. 
>>>>>>> column
>>>>>>> lower/upper bounds, nullValues, distinct value counts, etc. and that 
>>>>>>> table
>>>>>>> scanning leverages these to skip partitions, files & row-groups (in the
>>>>>>> Parquet context).
>>>>>>>
>>>>>>> What I found is files aren't skipped when a predicate applies only
>>>>>>> to a subset of the table's files. Within a partition it will scan all 
>>>>>>> files
>>>>>>> as manifests only keep record counts but the rest of the metrics (lower,
>>>>>>> upper, distinct value counts, null values) are null / empty. This is coz
>>>>>>> AvroFileAppender only keeps `recordCounts` as metrics [2].. And 
>>>>>>> currently
>>>>>>> that is the only appender supported for writing manifest files.
>>>>>>>
>>>>>>>
>>>>>>> *Example :*
>>>>>>>
>>>>>>> In following example iceTable was generated by iteratively adding
>>>>>>> two files so it has two separate parquet files under it ..
>>>>>>>
>>>>>>> scala> iceTable.newScan().planFiles.asScala.foreach(fl =>
>>>>>>> println(fl))
>>>>>>>
>>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>>>>>> partition_data=PartitionData{}, residual=true}
>>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>>>>>> partition_data=PartitionData{}, residual=true}
>>>>>>>
>>>>>>>
>>>>>>> *Only one file contains row with age = null .. *
>>>>>>>
>>>>>>> scala> iceDf.show()
>>>>>>> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a
>>>>>>> task of very large size (113 KB). The maximum recommended task size is 
>>>>>>> 100
>>>>>>> KB.
>>>>>>> +----+-------+--------------------+
>>>>>>> | age|   name|             friends|
>>>>>>> +----+-------+--------------------+
>>>>>>> |  60| Kannan|      [Justin -> 19]|
>>>>>>> |  75| Sharon|[Michael -> 30, J...|
>>>>>>> |null|Michael|                null|
>>>>>>> |  30|   Andy|[Josh -> 10, Bisw...|
>>>>>>> |  19| Justin|[Kannan -> 75, Sa...|
>>>>>>> +----+-------+--------------------+
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Running filter on isNull(age) scans both files .. *
>>>>>>>
>>>>>>> val isNullExp = Expressions.isNull("age")
>>>>>>> val isNullScan = iceTable.newScan().filter(isNullExp)
>>>>>>>
>>>>>>> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))
>>>>>>>
>>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>>>>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>>>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>>>>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I would expect only one file to be scanned as Iceberg should track
>>>>>>> nullValueCounts as per Metrics.java [1] .. The same issue holds for 
>>>>>>> integer
>>>>>>> comparison filters scanning too many files.
>>>>>>>
>>>>>>> When I looked through the code, there is provision for using Parquet
>>>>>>> file footer stats to populate Manifest Metrics [3] but this is never 
>>>>>>> used
>>>>>>> as Iceberg currently only allows AvroFileAppender for creating manifest
>>>>>>> files.
>>>>>>>
>>>>>>> What's the plan around using Parquet footer stats in manifests which
>>>>>>> can be very useful during split generation? I saw some discussions 
>>>>>>> around
>>>>>>> this in the Iceberg Spec document [4] but couldn't glean if any of those
>>>>>>> are actually implemented yet.
>>>>>>>
>>>>>>> I can work on a proposal PR for adding these in but wanted to  know
>>>>>>> the current thoughts around this.
>>>>>>>
>>>>>>>
>>>>>>> *Gist for above example *:
>>>>>>> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7
>>>>>>>
>>>>>>>
>>>>>>> Looking forward to your feedback,
>>>>>>>
>>>>>>> Cheers,
>>>>>>> -Gautam.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [1] -
>>>>>>> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java
>>>>>>> [2] -
>>>>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56
>>>>>>> [3] -
>>>>>>> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118
>>>>>>> [4] -
>>>>>>> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>

Reply via email to