Iceberg doesn't support binding expressions in sub-structs yet. So the fix
on the Iceberg side requires a few steps. First, collecting the metrics
from Parquet with Anton's PR, and second, updating expression binding to
work with structs.

The reason why binding doesn't work with structs yet it that we don't want
to bind structs that are within maps or arrays because those will change
the semantics of the expression. For example, a.b = 5 can be run on a:
struct<b: int> but can't be run on a: list<struct<b: int>>.

Also, the Avro problem wasn't because the manifests are stored as Avro.
Avro doesn't collect metrics about the data that is stored, but the
manifests have the metrics that were added with each file, so the problem
is not adding the metrics when you added the files. I think you've solved
the problem and correctly built your table metadata using the metrics from
the Parquet footers, but I still want to note the distinction: Avro
manifests store metrics correctly. Avro data files don't generate metrics.

On Thu, Feb 28, 2019 at 1:32 AM Gautam <gautamkows...@gmail.com> wrote:

> Hey Anton,
>       Wanted to circle back on the Spark PR [1] to add support for nested
> fields .. I tried applying it, tested it. With this change Spark pushes
> filters on structs down to Iceberg, but Iceberg expression handling seems
> to fail in validation ..
>
>
> Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot find
> field 'location.lat' in struct: struct<1: age: optional int, 2: name:
> optional string, 3: friends: optional map<string, int>, 4: location:
> optional struct<7: lat: optional double, 8: lon: optional double>>
>   at
> com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42)
>   at
> com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76)
>   at
> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138)
>   at
> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94)
>   at
> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147)
>   at
> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160)
>   at
> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108)
>   at
> com.netflix.iceberg.expressions.InclusiveManifestEvaluator.<init>(InclusiveManifestEvaluator.java:57)
>   at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153)
>   at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149)
>   at
> com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>   at
> com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>   at
> com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>   at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
>
>
> I think this should be handled in Iceberg as  struct filters like a.b.c =
> "blah" is a legit way to query in SQL. If you feel this is a valid
> assumption I can work on a fix.   Thoughts?
>
>
> *Test Table Schema:*
> scala> iceDf.printSchema
> root
>  |-- age: integer (nullable = true)
>  |-- name: string (nullable = true)
>  |-- friends: map (nullable = true)
>  |    |-- key: string
>  |    |-- value: integer (valueContainsNull = true)
>  |-- location: struct (nullable = true)
>  |    |-- lat: double (nullable = true)
>  |    |-- lon: double (nullable = true)
>
>
> *Gist to recreate issue:*
> https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac
>
>
> Cheers,
> -Gautam.
>
>
>
> [1] -  https://github.com/apache/spark/pull/22573
>
> On Tue, Feb 26, 2019 at 10:35 PM Anton Okolnychyi <aokolnyc...@apple.com>
> wrote:
>
>> Unfortunately, Spark doesn’t push down filters for nested columns. I
>> remember an effort to implement it [1]. However, it is not merged.
>> So, even if we have proper statistics in Iceberg, we cannot leverage it
>> from Spark.
>>
>> [1] - https://github.com/apache/spark/pull/22573
>>
>>
>> On 26 Feb 2019, at 16:52, Gautam <gautamkows...@gmail.com> wrote:
>>
>> Thanks Anton, this is very helpful!  I will apply the patch from pull#63
>> and give it a shot.
>>
>> Re: Collecting min/max stas on nested structures ( 
>> *https://github.com/apache/incubator-iceberg/issues/78
>> <https://github.com/apache/incubator-iceberg/issues/78>* ) ...
>>
>> We have the exact same use case for skipping files on nested field
>> filters. I was intrigued by your comment on enabling stats on nested
>> structures by replacing `fileSchema.asStruct().field(fieldId)` with `
>> fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` .. Have
>> you had success with this? If so, I can try it out on our data as well.
>>
>>
>>
>>
>>
>> On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi <aokolnyc...@apple.com>
>> wrote:
>>
>>> Hi Gautam,
>>>
>>> I believe you see this behaviour because SparkAppenderFactory is
>>> configured to use ParquetWriteAdapter. It only tracks the number of records
>>> and uses ParquetWriteSupport from Spark. This means that the statistics is
>>> not collected on writes and cannot be used on reads.
>>>
>>> Once [1] is merged, proper statistics will be fetched from the footer
>>> and persisted in the manifests. The statistics is collected when writing
>>> data files not manifests. See [2] for more info. Also, [3] contains an
>>> example that filters out files (it requires [1] to be cherry-picked
>>> locally).
>>>
>>> Hope that helps,
>>> Anton
>>>
>>> [1] - https://github.com/apache/incubator-iceberg/pull/63
>>> [2] -
>>> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
>>> [3] - https://github.com/apache/incubator-iceberg/pull/105
>>>
>>>
>>> On 26 Feb 2019, at 13:58, Gautam <gautamkows...@gmail.com> wrote:
>>>
>>> .. Just to be clear my concern is around Iceberg not skipping files.
>>> Iceberg does skip rowGroups when scanning files as
>>> *iceberg.parquet.ParquetReader* uses the parquet stats under it while
>>> skipping, albeit none of these stats come from the manifests.
>>>
>>> On Tue, Feb 26, 2019 at 7:24 PM Gautam <gautamkows...@gmail.com> wrote:
>>>
>>>> Hello Devs,
>>>>                    I am looking into leveraging Iceberg to speed up
>>>> split generation and to minimize file scans. My understanding was that
>>>> Iceberg keeps key statistics as listed under Metrics.java [1] viz. column
>>>> lower/upper bounds, nullValues, distinct value counts, etc. and that table
>>>> scanning leverages these to skip partitions, files & row-groups (in the
>>>> Parquet context).
>>>>
>>>> What I found is files aren't skipped when a predicate applies only to a
>>>> subset of the table's files. Within a partition it will scan all files as
>>>> manifests only keep record counts but the rest of the metrics (lower,
>>>> upper, distinct value counts, null values) are null / empty. This is coz
>>>> AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently
>>>> that is the only appender supported for writing manifest files.
>>>>
>>>>
>>>> *Example :*
>>>>
>>>> In following example iceTable was generated by iteratively adding two
>>>> files so it has two separate parquet files under it ..
>>>>
>>>> scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl))
>>>>
>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>>> partition_data=PartitionData{}, residual=true}
>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>>> partition_data=PartitionData{}, residual=true}
>>>>
>>>>
>>>> *Only one file contains row with age = null .. *
>>>>
>>>> scala> iceDf.show()
>>>> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a
>>>> task of very large size (113 KB). The maximum recommended task size is 100
>>>> KB.
>>>> +----+-------+--------------------+
>>>> | age|   name|             friends|
>>>> +----+-------+--------------------+
>>>> |  60| Kannan|      [Justin -> 19]|
>>>> |  75| Sharon|[Michael -> 30, J...|
>>>> |null|Michael|                null|
>>>> |  30|   Andy|[Josh -> 10, Bisw...|
>>>> |  19| Justin|[Kannan -> 75, Sa...|
>>>> +----+-------+--------------------+
>>>>
>>>>
>>>>
>>>> *Running filter on isNull(age) scans both files .. *
>>>>
>>>> val isNullExp = Expressions.isNull("age")
>>>> val isNullScan = iceTable.newScan().filter(isNullExp)
>>>>
>>>> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))
>>>>
>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/00000-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>>>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>>>>
>>>>
>>>>
>>>> I would expect only one file to be scanned as Iceberg should track
>>>> nullValueCounts as per Metrics.java [1] .. The same issue holds for integer
>>>> comparison filters scanning too many files.
>>>>
>>>> When I looked through the code, there is provision for using Parquet
>>>> file footer stats to populate Manifest Metrics [3] but this is never used
>>>> as Iceberg currently only allows AvroFileAppender for creating manifest
>>>> files.
>>>>
>>>> What's the plan around using Parquet footer stats in manifests which
>>>> can be very useful during split generation? I saw some discussions around
>>>> this in the Iceberg Spec document [4] but couldn't glean if any of those
>>>> are actually implemented yet.
>>>>
>>>> I can work on a proposal PR for adding these in but wanted to  know the
>>>> current thoughts around this.
>>>>
>>>>
>>>> *Gist for above example *:
>>>> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7
>>>>
>>>>
>>>> Looking forward to your feedback,
>>>>
>>>> Cheers,
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> [1] -
>>>> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java
>>>> [2] -
>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56
>>>> [3] -
>>>> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118
>>>> [4] -
>>>> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#
>>>>
>>>>
>>>
>>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to