Re: Welcoming OpenInx as a new PMC member!

2021-06-29 Thread Gautam
Congratulations Zheng Hu!

On Tue, Jun 29, 2021 at 8:17 PM OpenInx  wrote:

> Thanks all !
>
> I really appreciate the trust from the Apache iceberg community.  For me,
> this is not only an honor, but also a responsibility.  I'd like to share
> something about the current apache iceberg status in Asia:
>
> In the past year, the apache iceberg was growing rapidly in the Asian
> users.  Many internet  companies has forked their own branches to customize
> their iceberg (take few examples but not all) services:
>
> 1.   Aliyun.com ( from alibaba) ,  we have successfully integrated apache
> iceberg into our aliyun EMR services. And we are serving some customers
> with a huge data scale (PB).
> 2.   Tencent.  The iceberg has been a very important infrastructure for
> their internal users ,  besides Tencent also provides public cloud services
> (https://intl.cloud.tencent.com) for the external customers.  There is a
> post to share their flink+iceberg experience :
> https://www.alibabacloud.com/blog/flink-%2B-iceberg-how-to-construct-a-whole-scenario-real-time-data-warehouse_597824
> 3.   Dell Inc.  They have integrated iceberg data lake to their on-prem
> storage deployment ( which implements the aws s3 API so that people could
> easily migrate their aws s3 data to the on-prem dell storage deployment if
> they want to) for their customers.
> https://www.infoq.cn/article/Pe9ejRJDrJsp5AIhjlE3
> 4.   Oppo ( https://www.oppo.com/en/),  one of the most biggest mobile
> phone manufacturer  in the world,  has adopted the apache iceberg as their
> internal data lake table format:
> https://www.infoq.cn/article/kuyk9ieusyyxbq5loflu
> 5.   Netease (https://en.wikipedia.org/wiki/NetEase), has also adopted
> the apache iceberg to service their business:
> https://developpaper.com/netease-exploration-and-practice-of-flink-iceberg-data-lake/
>
> We also had several offline iceberg meetup in Asia in the past year:
>
> 1.  DataFunSummit  Data Lake meetup:
> https://mp.weixin.qq.com/s/bJldRWy3rg8su2jiV_-5xQ
> https://mp.weixin.qq.com/s/Ax2Dr7w7RxWxMGyyGlAopg#at
> 2.  Flink x Iceberg Meetup in Shanghai :
> https://zhuanlan.zhihu.com/p/361539420
> 3.  Apache Iceberg meetup in Shenzhen:
> https://segmentfault.com/a/119024535102
>
> At present, the iceberg community is developing well in Asia. I am very
> happy to see that with our joint efforts in the future, the Apache Iceberg
> & community can be better !
>
>
> On Wed, Jun 30, 2021 at 10:39 AM John Zhuge  wrote:
>
>> Congratulations!
>>
>> On Tue, Jun 29, 2021 at 7:32 PM wgcn.bj  wrote:
>>
>>> Congrats!
>>>
>>>  原始邮件
>>> *发件人:* Dongjoon Hyun
>>> *收件人:* dev
>>> *发送时间:* 2021年6月30日(周三) 10:05
>>> *主题:* Re: Welcoming OpenInx as a new PMC member!
>>>
>>> Congratulations!
>>>
>>> Dongjoon.
>>>
>>> On Tue, Jun 29, 2021 at 6:35 PM Forward Xu 
>>> wrote:
>>>
 Congratulations!


 best

 Forward

 Miao Wang  于2021年6月30日周三 上午8:25写道:

> Congratulations!
>
> Sent from my iPhone
>
> On Jun 29, 2021, at 4:57 PM, Steven Wu  wrote:
>
> 
> Congrats!
>
> On Tue, Jun 29, 2021 at 2:12 PM Huadong Liu 
> wrote:
>
>>
>> Congrats Zheng!
>>
>>
>> On Tue, Jun 29, 2021 at 1:52 PM Ryan Blue  wrote:
>>
>>> Hi everyone,
>>>
>>> I'd like to welcome OpenInx (Zheng Hu) as a new Iceberg PMC member.
>>>
>>> Thanks for all your contributions and commitment to the
>>> project, OpenInx!
>>>
>>>
>>> Ryan
>>>
>>> --
>>> Ryan Blue
>>>
>> --
>> John Zhuge
>>
>


Re: Welcoming Russell Spitzer as a new committer

2021-03-29 Thread Gautam Kowshik
Congrats Russell!

Sent from my iPhone

> On Mar 29, 2021, at 9:41 AM, Dilip Biswal  wrote:
> 
> 
> Congratulations Russel !! Very well deserved, indeed !! 
> 
>> On Mon, Mar 29, 2021 at 9:13 AM Miao Wang  wrote:
>> Congratulations Russell!
>> 
>>  
>> 
>> Miao
>> 
>>  
>> 
>> From: Szehon Ho 
>> Reply-To: "dev@iceberg.apache.org" 
>> Date: Monday, March 29, 2021 at 9:12 AM
>> To: "dev@iceberg.apache.org" 
>> Subject: Re: Welcoming Russell Spitzer as a new committer
>> 
>>  
>> 
>> Awesome, well-deserved, Russell!
>> 
>>  
>> 
>> Szehon
>> 
>> 
>> 
>> 
>> On 29 Mar 2021, at 18:10, Holden Karau  wrote:
>> 
>>  
>> 
>> Congratulations Russel!
>> 
>>  
>> 
>> On Mon, Mar 29, 2021 at 9:10 AM Anton Okolnychyi 
>>  wrote:
>> 
>> Hey folks,
>> 
>> I’d like to welcome Russell Spitzer as a new committer to the project!
>> 
>> Thanks for all your contributions, Russell!
>> 
>> - Anton
>> 
>> --
>> 
>> Twitter: https://twitter.com/holdenkarau
>> 
>> Books (Learning Spark, High Performance Spark, etc.): 
>> https://amzn.to/2MaRAG9 
>> 
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>> 
>>  


Re: Ways To Alleviate Load For Tables With Many Snapshots

2021-01-26 Thread Gautam
+ dawilcox

On Tue, Jan 26, 2021 at 11:46 AM Gautam  wrote:

> Hey Ryan & David,
>  I believe  this change from you [1] indirectly achieves this.
> David's issue is that every table.load() is instantiating one FS handle for
> each snapshot, and in your change, by converting the File reference into
> location string this is already a lazy read (in a way?). The version David
> has been testing with was before this change. I believe with the change in
> [1] the FS handles issue should be resolved.
>
> Please correct me if I'm wrong David/ Ryan.
>
> thanks and regards,
> -Gautam.
>
> [1] - https://github.com/apache/iceberg/pull/1085/files
>
> On Tue, Jan 26, 2021 at 10:55 AM Ryan Blue 
> wrote:
>
>> David,
>>
>> We could probably make it so that Snapshot instances are lazily created
>> from the metadata file, but that would be a fairly large change. If you're
>> interested, we can definitely make it happen.
>>
>> I agree with Vivekanand, though. A much easier solution is to reduce the
>> number of snapshots in the table by expiring them. How long are you
>> retaining snapshots?
>>
>> rb
>>
>> On Thu, Jan 21, 2021 at 8:11 PM Vivekanand Vellanki 
>> wrote:
>>
>>> Just curious, what is the need to retain all those snapshots?
>>>
>>> I would assume that there is a mechanism to expire snapshots and delete
>>> data/manifest files that are no longer required.
>>>
>>> On Thu, Jan 21, 2021 at 11:01 PM David Wilcox 
>>> wrote:
>>>
>>>> Hi Iceberg Devs,
>>>>
>>>> I have a process that reads Tables stored in Iceberg and processes
>>>> them, many at a time. Lately, we've had problems with the scalability of
>>>> our process due to the number of Hadoop Filesystem objects created inside
>>>> Iceberg for Tables with many snapshots. These tables could have tens of
>>>> thousands of snapshots inside, but I only want to read the latest snapshot.
>>>> Inside the Hadoop Filesystem creation code that's called for every
>>>> snapshot, there are process-level locks that end up locking up my whole
>>>> process.
>>>>
>>>> Inside TableMetadataParser, it looks like we read in every snapshot
>>>> even though the reader likely only wants one snapshot. This loop is what's
>>>> responsible for locking up my process.
>>>>
>>>> https://github.com/apache/iceberg/blob/330f1520ce497153f7a6e9a80a22035ff9f6aa32/core/src/main/java/org/apache/iceberg/TableMetadataParser.java#L320
>>>>
>>>> I noticed that my process does not care about the whole snapshot list.
>>>> My process only is interested in a particular snapshot -- just one of them.
>>>> I'm interested in making a contribution so that the entire snapshot list is
>>>> lazily calculated inside of TableMetadata where it's actually used. So, we
>>>> would not create the Snapshot itself in TableMetadataParser, but instead
>>>> likely would pass a SnapshotCreator in that could know how to create
>>>> snapshots. We would pass all of the SnapshotCreators into TableMetadata
>>>> which would create snapshots when needed.
>>>>
>>>> Would you be amenable to such a change? I want to make sure that you
>>>> think that this sounds like something you would accept before I spend time
>>>> coding it up.
>>>>
>>>> Any other thoughts on this?
>>>>
>>>> Thanks,
>>>> David Wilcox
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: Ways To Alleviate Load For Tables With Many Snapshots

2021-01-26 Thread Gautam
Hey Ryan & David,
 I believe  this change from you [1] indirectly achieves this.
David's issue is that every table.load() is instantiating one FS handle for
each snapshot, and in your change, by converting the File reference into
location string this is already a lazy read (in a way?). The version David
has been testing with was before this change. I believe with the change in
[1] the FS handles issue should be resolved.

Please correct me if I'm wrong David/ Ryan.

thanks and regards,
-Gautam.

[1] - https://github.com/apache/iceberg/pull/1085/files

On Tue, Jan 26, 2021 at 10:55 AM Ryan Blue 
wrote:

> David,
>
> We could probably make it so that Snapshot instances are lazily created
> from the metadata file, but that would be a fairly large change. If you're
> interested, we can definitely make it happen.
>
> I agree with Vivekanand, though. A much easier solution is to reduce the
> number of snapshots in the table by expiring them. How long are you
> retaining snapshots?
>
> rb
>
> On Thu, Jan 21, 2021 at 8:11 PM Vivekanand Vellanki 
> wrote:
>
>> Just curious, what is the need to retain all those snapshots?
>>
>> I would assume that there is a mechanism to expire snapshots and delete
>> data/manifest files that are no longer required.
>>
>> On Thu, Jan 21, 2021 at 11:01 PM David Wilcox 
>> wrote:
>>
>>> Hi Iceberg Devs,
>>>
>>> I have a process that reads Tables stored in Iceberg and processes them,
>>> many at a time. Lately, we've had problems with the scalability of our
>>> process due to the number of Hadoop Filesystem objects created inside
>>> Iceberg for Tables with many snapshots. These tables could have tens of
>>> thousands of snapshots inside, but I only want to read the latest snapshot.
>>> Inside the Hadoop Filesystem creation code that's called for every
>>> snapshot, there are process-level locks that end up locking up my whole
>>> process.
>>>
>>> Inside TableMetadataParser, it looks like we read in every snapshot even
>>> though the reader likely only wants one snapshot. This loop is what's
>>> responsible for locking up my process.
>>>
>>> https://github.com/apache/iceberg/blob/330f1520ce497153f7a6e9a80a22035ff9f6aa32/core/src/main/java/org/apache/iceberg/TableMetadataParser.java#L320
>>>
>>> I noticed that my process does not care about the whole snapshot list.
>>> My process only is interested in a particular snapshot -- just one of them.
>>> I'm interested in making a contribution so that the entire snapshot list is
>>> lazily calculated inside of TableMetadata where it's actually used. So, we
>>> would not create the Snapshot itself in TableMetadataParser, but instead
>>> likely would pass a SnapshotCreator in that could know how to create
>>> snapshots. We would pass all of the SnapshotCreators into TableMetadata
>>> which would create snapshots when needed.
>>>
>>> Would you be amenable to such a change? I want to make sure that you
>>> think that this sounds like something you would accept before I spend time
>>> coding it up.
>>>
>>> Any other thoughts on this?
>>>
>>> Thanks,
>>> David Wilcox
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Adobe Blog ..

2021-01-15 Thread Gautam
> I think it would be great to add a section to the website linking to
helpful articles, slide decks, etc about Iceberg. In the trenches
information is often the most useful

+1 ..IIRC  there were also some thoughts around adding a "*PoweredBy*"
page? Our team often gets asked by folks: "*Who else is using Iceberg*?", I
would love to point them to the fast growing list of companies/teams that
do :-) .. Wdyt?

On Fri, Jan 15, 2021 at 4:00 PM Jacques Nadeau 
wrote:

> +1. This is a great series.
>
> I think it would be great to add a section to the website linking to
> helpful articles, slide decks, etc about Iceberg. In the trenches
> information is often the most useful.
>
> On Fri, Jan 15, 2021 at 3:43 PM Ryan Blue 
> wrote:
>
>> Thanks, Gautam! I was just reading the one on query optimizations. Great
>> that you are writing this series, I think it will be helpful.
>>
>> On Fri, Jan 15, 2021 at 3:36 PM Gautam  wrote:
>>
>>> Hello Devs,
>>>   We at Adobe have been penning down our experiences
>>> with Apache Iceberg thus far. Here is the third blog in that series titled:
>>> "Taking Query Optimizations to the Next Level with Iceberg" *[1]*. In
>>> case you haven't, here are the first two blogs titled "Iceberg at Adobe"
>>> *[2]* and "High Throughput Ingestion with Iceberg" *[3]*.
>>>
>>> Hoping these are helpful to others..
>>>
>>> thanks and regards,
>>> -Gautam.
>>>
>>> [1] -
>>> https://medium.com/adobetech/taking-query-optimizations-to-the-next-level-with-iceberg-6c968b83cd6f
>>> [2] - https://medium.com/adobetech/iceberg-at-adobe-88cf1950e866
>>> [3] -
>>> https://medium.com/adobetech/high-throughput-ingestion-with-iceberg-ccf7877a413f
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Adobe Blog ..

2021-01-15 Thread Gautam
Hello Devs,
  We at Adobe have been penning down our experiences with
Apache Iceberg thus far. Here is the third blog in that series titled:
"Taking Query Optimizations to the Next Level with Iceberg" *[1]*. In case
you haven't, here are the first two blogs titled "Iceberg at Adobe" *[2]*
and "High Throughput Ingestion with Iceberg" *[3]*.

Hoping these are helpful to others..

thanks and regards,
-Gautam.

[1] -
https://medium.com/adobetech/taking-query-optimizations-to-the-next-level-with-iceberg-6c968b83cd6f
[2] - https://medium.com/adobetech/iceberg-at-adobe-88cf1950e866
[3] -
https://medium.com/adobetech/high-throughput-ingestion-with-iceberg-ccf7877a413f


Re: Timestamp Based Incremental Reading in Iceberg ...

2020-09-10 Thread Gautam
Wanted to circle back on this thread. Linear timestamps was discussed
during the sync and the conclusion was that timestamp based incremental
reading is generally discouraged as that introduces correctness issues.
Even if a custom clock is available keeping timestamps atomic and
monotonically increasing is going to be a problem for applications.
Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow
potential issues e.g. a client committing an erroneous timestamp, that is
way in the future, would block all other clients from committing.

This is better handled by attaching a global transaction-id (e.g. UUID that
is monotonically increasing) to the snapshot metadata (iceberg allows
adding this to the summary). The incremental read application can then use
the transaction-id as a key to the exact from/to snapshot-id to do
incremental reading.

Hope I covered the points raised.

Regards,
-Gautam.

On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue  wrote:

> Hi everyone, I'm putting this on the agenda for today's Iceberg sync.
>
> Also, I want to point out John's recent PR that added a way to inject a
> Clock that is used for timestamp generation:
> https://github.com/apache/iceberg/pull/1389
>
> That fits nicely with the requirements here and would be an easy way to
> inject your own time, synchronized by an external service.
>
> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary 
> wrote:
>
>> Quick question below about the proposed usage of the timestamp:
>>
>> On Sep 9, 2020, at 7:24 AM, Miao Wang  wrote:
>>
>> +1 Openlnx’s comment on implementation.
>>
>> Only if we have an external timing synchronization service and enforce
>> all clients using the service, timestamps of different clients are not
>> comparable.
>>
>>
>> Do we want to use the timestamp as the real timestamp of the last change,
>> or we want to use it only as a monotonously increasing more human readable
>> identifier?
>> Do we want to compare this timestamp against some external source, or we
>> just want to compare this timestamp with other timestamps in the different
>> snapshots of the same table?
>>
>>
>> So, there are two asks: 1). Whether to have a timestamp based API for
>> delta reading; 2). How to enforce and implement a service/protocol for
>> timestamp sync among all clients.
>>
>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could be
>> source of truth in any cases.
>>
>> 2). IMO, it should be an external package to Iceberg.
>>
>> Miao
>>
>> *From: *OpenInx 
>> *Reply-To: *"dev@iceberg.apache.org" 
>> *Date: *Tuesday, September 8, 2020 at 7:55 PM
>> *To: *Iceberg Dev List 
>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ...
>>
>> I agree that  it's helpful to allow users to read the incremental delta
>> based timestamp,  as Jingsong said timestamp is more friendly.
>>
>> My question is how to implement this ?
>>
>>  If just attach the client's timestamp to the iceberg table when
>> committing,  then different clients may have different timestamp values
>> because of the skewing. In theory, these time values are not strictly
>> comparable, and can only be compared within the margin of error.
>>
>>
>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li 
>> wrote:
>>
>> +1 for timestamps are linear, in implementation, maybe the writer only
>> needs to look at the previous snapshot timestamp.
>>
>> We're trying to think of iceberg as a message queue, Let's take the
>> popular queue Kafka as an example,
>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset and
>> timestamp:
>> - offset: It is used for incremental read, such as the state of a
>> checkpoint in a computing system.
>> - timestamp: It is explicitly specified by the user to specify the scope
>> of consumption. As start_timestamp of reading. Timestamp is a better user
>> aware interface. But offset/snapshotId is not human readable and friendly.
>>
>> So there are scenarios where timestamp is used for incremental read.
>>
>> Best,
>> Jingsong
>>
>>
>> On Wed, Sep 9, 2020 at 12:45 AM Sud  wrote:
>>
>>
>> We are using incremental read for iceberg tables which gets quite few
>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>> snapshot ids and track state of last read snapshot Id.
>> We are using timestamp as fallback when the state is incorrect, but as
>> you mentioned if timestamps are linear then it works as expected.
>> We also found that incremental reader mi

Timestamp Based Incremental Reading in Iceberg ...

2020-09-08 Thread Gautam
Hello Devs,
   We are looking into adding workflows that read data
incrementally based on commit time. The ability to read deltas between
start / end commit timestamps on a table and ability to resume reading from
last read end timestamp. In that regard, we need the timestamps to be
linear in the current active snapshot history (newer versions always have
higher timestamps). Although Iceberg commit flow ensures the versions are
newer, there isn't a check to ensure timestamps are linear.

Example flow, if two clients (clientA and clientB), whose time-clocks are
slightly off (say by a couple seconds), are committing frequently, clientB
might get to commit after clientA even if it's new snapshot timestamps is
out of order. I might be wrong but I haven't found a check in
HadoopTableOperations.commit() to ensure this above case does not happen.

On the other hand, restricting commits due to out-of-order timestamps can
hurt commit throughput so I can see why this isn't something Iceberg might
want to enforce based on System.currentTimeMillis(). Although if clients
had a way to define their own globally synchronized timestamps (using
external service or some monotonically increasing UUID) then iceberg could
allow an API to set that on the snapshot or use that instead of
System.currentTimeMillis(). Iceberg exposes something similar using
Sequence numbers in v2 format to track Deletes and Appends.

Is this a concern others have? If so how are folks handling this today or
are they not exposing such a feature at all due to the inherent distributed
timing problem? Would like to hear how others are thinking/going about
this. Thoughts?

Cheers,

-Gautam.


Re: New committer: Shardul Mahadik

2020-07-23 Thread Gautam
Congratulations Shardul!

On Thu, Jul 23, 2020 at 12:24 AM Shardul Mahadik 
wrote:

> Thanks everyone!!
>
> Best,
> Shardul
>
> On 2020/07/23 06:52:57, "Driesprong, Fokko"  wrote:
> > Congrats Shardul! Great work!
> >
> > Cheers, Fokko
> >
> > Op do 23 jul. 2020 om 07:46 schreef Miao Wang  >:
> >
> > > Congratulations!
> > >
> > > Miao
> > >
> > > Sent from my iPhone
> > >
> > > > On Jul 22, 2020, at 8:08 PM, 俊杰陈  wrote:
> > > >
> > > > Congrats! Good job!
> > > >
> > > >> On Thu, Jul 23, 2020 at 11:01 AM Saisai Shao <
> sai.sai.s...@gmail.com>
> > > wrote:
> > > >>
> > > >> Congrats!
> > > >>
> > > >> Thanks
> > > >> Saisai
> > > >>
> > > >> OpenInx  于2020年7月23日周四 上午10:06写道:
> > > >>>
> > > >>> Congratulations !
> > > >>>
> > > >>> On Thu, Jul 23, 2020 at 9:31 AM Jingsong Li <
> jingsongl...@gmail.com>
> > > wrote:
> > > 
> > >  Congratulations Shardul! Well deserved!
> > > 
> > >  Best,
> > >  Jingsong
> > > 
> > >  On Thu, Jul 23, 2020 at 7:27 AM Anton Okolnychyi
> > >  wrote:
> > > >
> > > > Congrats and welcome! Keep up the good work!
> > > >
> > > > - Anton
> > > >
> > > > On 22 Jul 2020, at 16:02, RD  wrote:
> > > >
> > > > Congratulations Shardul! Well deserved!
> > > >
> > > > -Best,
> > > > R.
> > > >
> > > > On Wed, Jul 22, 2020 at 2:24 PM Ryan Blue 
> wrote:
> > > >>
> > > >> Hi everyone,
> > > >>
> > > >> I'd like to congratulate Shardul Mahadik, who was just invited
> to
> > > join the Iceberg committers!
> > > >>
> > > >> Thanks for all your contributions, Shardul!
> > > >>
> > > >> rb
> > > >>
> > > >>
> > > >> --
> > > >> Ryan Blue
> > > >
> > > >
> > > 
> > > 
> > >  --
> > >  Best, Jingsong Lee
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks & Best Regards
> > >
> >
>


Re: [VOTE] Release Apache Iceberg 0.9.0 RC5

2020-07-12 Thread Gautam
*Followed the steps:*
1. Downloaded the source tarball, signature (.asc), and checksum (.sha512)
from
https://dist.apache.org/repos/dist/dev/iceberg/apache-iceberg-0.9.0-rc5/
2. Downloaded https://dist.apache.org/repos/dist/dev/incubator/iceberg/KEYS
 Import gpg keys: download KEYS and run gpg --import
/path/to/downloaded/KEYS
3. Verified the signature by running: gpg --verify
apache-iceberg-0.9.0.tar.gz.asc
4. Verified the checksum by running: sha512sum -c
apache-iceberg-0.9.0.tar.gz.sha512
5. Untared the archive and go into the source directory: tar xzf
apache-iceberg-0.9.0.tar.gz && cd apache-iceberg-0.9.0
6. Ran RAT checks to validate license headers:
  dev/check-license
7. Build and test the project: ./gradlew build (using Java 8)
 > Build Took ~10mins

*+1 (non-binding)*


On Fri, Jul 10, 2020 at 9:20 AM Ryan Murray  wrote:

> 1. Verify the signature: OK
> 2. Verify the checksum: OK
> 3. Untar the archive tarball: OK
> 4. Run RAT checks to validate license headers: RAT checks passed
> 5. Build and test the project: all unit tests passed.
>
> +1 (non-binding)
>
> I did see that my build took >12 minutes and used all 100% of all 8 cores
> & 32GB of memory (openjdk-8 ubuntu 18.04) which I haven't noticed before.
> On Fri, Jul 10, 2020 at 4:37 AM OpenInx  wrote:
>
>> I followed the verify guide here (
>> https://lists.apache.org/thread.html/rd5e6b1656ac80252a9a7d473b36b6227da91d07d86d4ba4bee10df66%40%3Cdev.iceberg.apache.org%3E)
>> :
>>
>> 1. Verify the signature: OK
>> 2. Verify the checksum: OK
>> 3. Untar the archive tarball: OK
>> 4. Run RAT checks to validate license headers: RAT checks passed
>> 5. Build and test the project: all unit tests passed.
>>
>> +1 (non-binding).
>>
>> On Fri, Jul 10, 2020 at 9:46 AM Ryan Blue 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> I propose the following RC to be released as the official Apache Iceberg
>>> 0.9.0 release.
>>>
>>> The commit id is 4e66b4c10603e762129bc398146e02d21689e6dd
>>> * This corresponds to the tag: apache-iceberg-0.9.0-rc5
>>> * https://github.com/apache/iceberg/commits/apache-iceberg-0.9.0-rc5
>>> * https://github.com/apache/iceberg/tree/4e66b4c1
>>>
>>> The release tarball, signature, and checksums are here:
>>> *
>>> https://dist.apache.org/repos/dist/dev/iceberg/apache-iceberg-0.9.0-rc5/
>>>
>>> You can find the KEYS file here:
>>> * https://dist.apache.org/repos/dist/dev/iceberg/KEYS
>>>
>>> Convenience binary artifacts are staged in Nexus. The Maven repository
>>> URL is:
>>> *
>>> https://repository.apache.org/content/repositories/orgapacheiceberg-1008/
>>>
>>> This release includes support for Spark 3 and vectorized reads for flat
>>> schemas in Spark.
>>>
>>> Please download, verify, and test.
>>>
>>> Please vote in the next 72 hours.
>>>
>>> [ ] +1 Release this as Apache Iceberg 0.9.0
>>> [ ] +0
>>> [ ] -1 Do not release this because...
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>


Re: [VOTE] Graduate to a top-level project

2020-05-14 Thread Gautam
+1  We'v come a long way :-)

On Wed, May 13, 2020 at 1:07 AM Dongjoon Hyun 
wrote:

> +1 for graduation!
>
> Bests,
> Dongjoon.
>
> On Tue, May 12, 2020 at 11:59 PM Driesprong, Fokko 
> wrote:
>
>> +1
>>
>> Op wo 13 mei 2020 om 08:58 schreef jiantao yu 
>>
>>> +1 for graduation.
>>>
>>>
>>> 在 2020年5月13日,下午12:50,Jun H.  写道:
>>>
>>> +1 for graduation.
>>>
>>>
>>> On Tue, May 12, 2020 at 9:41 PM 李响  wrote:
>>>
>>>
>>> +1 non-binding. My honor to be a part of this.
>>>
>>> On Wed, May 13, 2020 at 10:16 AM OpenInx  wrote:
>>>
>>>
>>> +1 for graduation.  It's a great news that we've prepared to graduate.
>>>
>>> (non-binding).
>>>
>>> On Wed, May 13, 2020 at 9:50 AM Saisai Shao 
>>> wrote:
>>>
>>>
>>> +1 for graduation.
>>>
>>> Junjie Chen  于2020年5月13日周三 上午9:33写道:
>>>
>>>
>>> +1
>>>
>>> On Wed, May 13, 2020 at 8:07 AM RD  wrote:
>>>
>>>
>>> +1 for graduation!
>>>
>>> On Tue, May 12, 2020 at 3:50 PM John Zhuge  wrote:
>>>
>>>
>>> +1
>>>
>>> On Tue, May 12, 2020 at 3:33 PM parth brahmbhatt <
>>> brahmbhatt.pa...@gmail.com> wrote:
>>>
>>>
>>> +1
>>>
>>> On Tue, May 12, 2020 at 3:31 PM Anton Okolnychyi
>>>  wrote:
>>>
>>>
>>> +1 for graduation
>>>
>>> On 12 May 2020, at 15:30, Ryan Blue  wrote:
>>>
>>> +1
>>>
>>> Jacques, I agree. I'll make sure to let you know about the IPMC vote
>>> because we'd love to have your support there as well.
>>>
>>> On Tue, May 12, 2020 at 3:02 PM Jacques Nadeau 
>>> wrote:
>>>
>>>
>>> I'm +1.
>>>
>>> (I think that is non-binding here but binding at the incubator level)
>>> --
>>> Jacques Nadeau
>>> CTO and Co-Founder, Dremio
>>>
>>>
>>> On Tue, May 12, 2020 at 2:35 PM Romin Parekh 
>>> wrote:
>>>
>>>
>>> +1
>>>
>>> On Tue, May 12, 2020 at 2:32 PM Owen O'Malley 
>>> wrote:
>>>
>>>
>>> +1
>>>
>>> On Tue, May 12, 2020 at 2:16 PM Ryan Blue  wrote:
>>>
>>>
>>> Hi everyone,
>>>
>>> I propose that the Iceberg community should petition to graduate from
>>> the Apache Incubator to a top-level project.
>>>
>>> Here is the draft board resolution:
>>>
>>> Establish the Apache Iceberg Project
>>>
>>> WHEREAS, the Board of Directors deems it to be in the best interests of
>>> the Foundation and consistent with the Foundation's purpose to establish
>>> a Project Management Committee charged with the creation and maintenance
>>> of open-source software, for distribution at no charge to the public,
>>> related to managing huge analytic datasets using a standard at-rest
>>> table format that is designed for high performance and ease of use..
>>>
>>> NOW, THEREFORE, BE IT RESOLVED, that a Project Management Committee
>>> (PMC), to be known as the "Apache Iceberg Project", be and hereby is
>>> established pursuant to Bylaws of the Foundation; and be it further
>>>
>>> RESOLVED, that the Apache Iceberg Project be and hereby is responsible
>>> for the creation and maintenance of software related to managing huge
>>> analytic datasets using a standard at-rest table format that is designed
>>> for high performance and ease of use; and be it further
>>>
>>> RESOLVED, that the office of "Vice President, Apache Iceberg" be and
>>> hereby is created, the person holding such office to serve at the
>>> direction of the Board of Directors as the chair of the Apache Iceberg
>>> Project, and to have primary responsibility for management of the
>>> projects within the scope of responsibility of the Apache Iceberg
>>> Project; and be it further
>>>
>>> RESOLVED, that the persons listed immediately below be and hereby are
>>> appointed to serve as the initial members of the Apache Iceberg Project:
>>>
>>> * Anton Okolnychyi 
>>> * Carl Steinbach   
>>> * Daniel C. Weeks  
>>> * James R. Taylor  
>>> * Julien Le Dem
>>> * Owen O'Malley
>>> * Parth Brahmbhatt 
>>> * Ratandeep Ratti  
>>> * Ryan Blue
>>>
>>> NOW, THEREFORE, BE IT FURTHER RESOLVED, that Ryan Blue be appointed to
>>> the office of Vice President, Apache Iceberg, to serve in accordance
>>> with and subject to the direction of the Board of Directors and the
>>> Bylaws of the Foundation until death, resignation, retirement, removal
>>> or disqualification, or until a successor is appointed; and be it
>>> further
>>>
>>> RESOLVED, that the Apache Iceberg Project be and hereby is tasked with
>>> the migration and rationalization of the Apache Incubator Iceberg
>>> podling; and be it further
>>>
>>> RESOLVED, that all responsibilities pertaining to the Apache Incubator
>>> Iceberg podling encumbered upon the Apache Incubator PMC are hereafter
>>> discharged.
>>>
>>> Please vote in the next 72 hours.
>>>
>>> [ ] +1 Petition the IPMC to graduate to top-level project
>>> [ ] +0
>>> [ ] -1 Wait to graduate because . . .
>>>
>>> --
>>> Ryan Blue
>>>
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Romin
>>>
>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>>
>>>
>>>
>>> --
>>> John Zhuge
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>>
>>>
>>>
>>> --
>>>
>>>  李响 Xiang Li
>>>
>>> 手机 cellphone :+86-136-8

Re: [DISCUSS] Changes for row-level deletes

2020-05-06 Thread Gautam
My 2 cents :


>  * Merge manifest_entry and data_file?

 ...   -1  ..   keeping the difference between v1 and v2 metadata to a
minimum would be my preference by keeping manifest_entries the same way in
both v1 and v2. People using either flows would want to modify and
contribute and shouldn't have to worry about porting  things over every
time.

>  * How should planning with delete files work?

 .. +1 on keeping these independent and in two phases , as you mentioned.
Allows processing in parallel. Could make this a SparkAction too at some
point?


>  * Mix delete files and data files in manifests? I think we should not,
to support the two-phase planning approach.

  -1  .. We should not for the reason you mention.


>  * If delete files and data files are separate, should manifests use the
same schema?

+1.

On Wed, May 6, 2020 at 10:39 AM Anton Okolnychyi
 wrote:

> We won’t have to rewrite V1 metadata when migrating to V2. The format is
> backward compatible and we can read V1 manifests just fine in V2. For
> example, V1 metadata will not have have sequence number and V2 would
> interpret that as sequence number = 0. The only thing we need to prohibit
> is V1 writers writing to V2 tables. That check is already in place and such
> attempts will fail. Recent changes that went in ensure that V1 and V2
> co-exist in the same codebase. As of now, we have a format version in
> TableMetadata. I think the manual change Ryan was referring to would
> simply mean updating that version flag, not rewriting the metadata. That
> change can be done via TableOperations.
>
> One change that I've been considering is getting rid of manifest_entry. In
> v1, a manifest stored a manifest_entry that wrapped a data_file. The intent
> was to separate data that API users needed to supply -- fields in data_file
> -- from data that was tracked internally by Iceberg -- the snapshot_id and
> status fields of manifest_entry. If we want to combine these so that a
> manifest stores one top-level data_file struct, then now is the time to
> make that change. I've prototyped this in #963
> . The benefit is
> that the schema is flatter so we wouldn't need two metadata tables (entries
> and files). The main drawback is that we aren't going to stop using v1
> tables, so we would effectively have two different manifest schemas instead
> of v2 as an evolution of v1. I'd love to hear more opinions on whether to
> do this. I'm leaning toward not merging the two.
>
>
> As mentioned earlier, I’d rather keep ManifestEntry to reduce the number
> of changes we have in V1 and V2. I feel it will be easier for other people
> who want to contribute to the core metadata management to follow it. That
> being said, I do get the intention of merging the two.
>
> Another change is to start adding tracking fields for delete files and
> updating the APIs. The metadata for this is fairly simple: an enum that
> stores whether the file is data, position deletes, or equality deletes. The
> main decision point is whether to allow mixing data files and delete files
> together in manifests. I don't think that we should allow manifests with
> both delete files and data files. The reason is job planning: we want to
> start emitting splits immediately so that we can stream them, instead of
> holding them all in memory. That means we need some way to guarantee that
> we know all of the delete files to apply to a data file before we encounter
> the data file.
>
>
> I don’t see a good reason to mix delete and data files in a single
> manifest now. In our original idea, we wanted to keep deletes separately as
> it felt it would be easier to come up with an efficient job planning
> approach later on. I think once we know the approach we want to take for
> planning input splits and doing compaction, we can revisit this point again.
>
> - Anton
>
> On 6 May 2020, at 09:04, Junjie Chen  wrote:
>
> Hi Ryan
>
> Besides the reading and merging of delete files, can we talk a bit about
> write side of delete files? For example, generate delete files in a spark
> action, the metadata column support, the service to transfer equality
> delete files to position delete files etc..
>
> On Wed, May 6, 2020 at 1:34 PM Miao Wang  wrote:
>
>> Hi Ryan,
>>
>>
>>
>> “Tables must be manually upgraded to version 2 in order to use any of the
>> metadata changes we are making” If I understand correctly, for exist
>> iceberg table in v1, we have to run some CLI/script to rewrite the
>> metadata.
>>
>>
>>
>> “Next, we've added sequence numbers and the proposed inheritance scheme
>> to v2, along with tests to ensure that v1 is written without sequence
>> numbers and that when reading v1 metadata, the sequence numbers are all 0.”
>> To me, this means V2 reader should be able to read V1 table metadata.
>> Therefore, the step above is not required, which only requires us to use a
>> V2 reader on a V1 table.
>>
>>
>>
>> However, if a table has been writte

Re: [VOTE] Release Apache Iceberg 0.8.0-incubating RC2

2020-04-30 Thread Gautam
Ran checks on
https://dist.apache.org/repos/dist/dev/incubator/iceberg/apache-iceberg-0.8.0-incubating-rc2/

√ RAT checks passed
√ signature is correct
√ checksum is correct
√ build from source (with java 8)
√ run tests locally

+1 (non-binding)



On Thu, Apr 30, 2020 at 4:18 PM Samarth Jain  wrote:

> +1 (non-binding)
> all checks passed
>
> On Thu, Apr 30, 2020 at 4:06 PM John Zhuge  wrote:
>
>> +1 (non-binding)
>>
>>1. Checked signature and checksum
>>2. Checked license
>>3. Built and ran unit tests.
>>
>>
>> On Thu, Apr 30, 2020 at 2:24 PM Owen O'Malley 
>> wrote:
>>
>>> +1
>>>
>>>1. Checked signature and checksum
>>>2. Built and ran unit tests.
>>>3. Checked ORC version :)
>>>
>>> On Monday, ORC released 1.6.3, so we should grab those fixes soon.
>>>
>>> .. Owen
>>>
>>> On Thu, Apr 30, 2020 at 12:34 PM Dongjoon Hyun 
>>> wrote:
>>>
 +1.

 1. Verified checksum, sig, and license
 3. Build from the source and run UTs.
 4. Run some manual ORC write/read tests with Apache Spark
 2.4.6-SNAPSHOT (as of today).

 Thank you, all!

 Bests,
 Dongjoon.

 On Thu, Apr 30, 2020 at 10:28 AM parth brahmbhatt <
 brahmbhatt.pa...@gmail.com> wrote:

> +1. checks passed, did not observe the unit test failure.
>
> Thanks
> Parth
>
> On Thu, Apr 30, 2020 at 9:13 AM Daniel Weeks 
> wrote:
>
>> +1 all checks passed
>>
>> On Thu, Apr 30, 2020 at 8:53 AM Anton Okolnychyi
>>  wrote:
>>
>>> That test uses many concurrent writes and I’ve seen cases when it
>>> led to deadlocks in our test HMS. I think HMS is capable of recovering 
>>> on
>>> its own but that process can be slow in highly concurrent environments.
>>> There is a 2 min timeout in that test so it can potentially fail. I’ve 
>>> seen
>>> a deadlock but 2 min was always enough for that test in my local env and
>>> internal/upstream build pipelines. If there is an environment that
>>> constantly or frequently hits this problem, it would be great to check
>>> debug logs.
>>>
>>> I am +1 on releasing RC2. I checked it locally.
>>>
>>> - Anton
>>>
>>> On 30 Apr 2020, at 02:52, Mass Dosage  wrote:
>>>
>>> The build for RC2 worked fine for me, I didn't get a failure on
>>> "TestHiveTableConcurrency". Perhaps there is some kind of race 
>>> condition in
>>> the test? I have seen timeout errors like that when I ran tests on an
>>> overloaded machine, could that have been the case?
>>>
>>> On Thu, 30 Apr 2020 at 08:32, OpenInx  wrote:
>>>
 I checked the rc2, seems the TestHiveTableConcurrency is broken,
 may need to fix it.

 1. Download the tarball and check the signature & checksum: OK
 2. license checking: RAT checks passed.
 3. Build and test the project (java8):
 org.apache.iceberg.hive.TestHiveTableConcurrency >
 testConcurrentConnections FAILED
 java.lang.AssertionError: Timeout
 at org.junit.Assert.fail(Assert.java:88)
 at org.junit.Assert.assertTrue(Assert.java:41)
 at
 org.apache.iceberg.hive.TestHiveTableConcurrency.testConcurrentConnections(TestHiveTableConcurrency.java:106)

 On Thu, Apr 30, 2020 at 9:29 AM Ryan Blue  wrote:

> Hi everyone,
>
> I propose the following candidate to be released as the official
> Apache Iceberg 0.8.0-incubating release.
>
> The commit id is 8c05a2f5f1c8b111c049d43cf15cd8a51920dda1
> * This corresponds to the tag: apache-iceberg-0.8.0-incubating-rc2
> *
> https://github.com/apache/incubator-iceberg/commits/apache-iceberg-0.8.0-incubating-rc2
> * https://github.com/apache/incubator-iceberg/tree/8c05a2f5
>
> The release tarball, signature, and checksums are here:
> *
> https://dist.apache.org/repos/dist/dev/incubator/iceberg/apache-iceberg-0.8.0-incubating-rc2/
>
> You can find the KEYS file here:
> * https://dist.apache.org/repos/dist/dev/incubator/iceberg/KEYS
>
> Convenience binary artifacts are staged in Nexus. The Maven
> repository URL is:
> *
> https://repository.apache.org/content/repositories/orgapacheiceberg-1006/
>
> This release contains many bug fixes and several new features:
> * Actions to remove orphaned files and to optimize metadata for
> query performance
> * Support for ORC data files
> * Snapshot cherry-picking
> * Incremental scan planning based on table history
> * In and notIn expressions
> * An InputFormat for writing MR jobs
>
> Please download, verify, and test.
>
> Please vote in the next 72 hours.
>
> [ ] +1 Release this as Apache Ice

Re: Open a new branch for row-delete feature ?

2020-03-30 Thread Gautam
 Thanks for bringing this up OpenInx.  That's a great idea: to open a
separate branch for row-level deletes.

I would like to help support/contribute/review this as well. If there are
sub-tasks you guys have identified that can be added to
https://github.com/apache/incubator-iceberg/milestone/4 we can start taking
those up too.

thanks for the good work,
- Gautam.



On Mon, Mar 30, 2020 at 8:39 AM Junjie Chen 
wrote:

> +1 to create the branch. Some row-level delete subtasks must be based on
> the sequence number as well as end to end tests.
>
> On Fri, Mar 27, 2020 at 4:42 PM OpenInx  wrote:
>
>> Dear Dev:
>>
>>  Tuesday, we had a sync meeting. and discussed about the things:
>>  1.  cut the 0.8.0 release;
>>  2.  flink connector ;
>>  3.  iceberg row-level delete;
>>  4. Map-Reduce Formats and Hive support.
>>
>>   We'll release version 0.8.0 around April 15, the following 0.9.0
>> will be
>>  released in the next few month. On the other hand, Ryan, Junjie Chen
>>  and I have done three PoC versions for the row-level deletes. We had
>>  a full discussion[4] and started to do the relevant code design.
>> we're sure that
>>  the feature will introduce some incompatible specification,  such as
>> the
>>  sequence_number spec[1], file_type spec[2], the sortedOrder feature
>> seems
>>  also to be a breaking change [3].
>>
>>  To avoid affecting the release of version 0.8.0 and push the
>> row-delete feature
>>  early. I suggest to open a new branch for the row-delete feature,
>> name it branch-1.
>>  Once the row-delete feature is stable, we could release the 1.0.0.
>> Or we can just
>>  open a row-delete feature branch and once the work is done we will
>> merge
>>  the row-delete feature branch back to master branch, and continue to
>> release the 0.9.0
>>  version.
>>
>>  I guess the flink connector dev are facing the same problem ?
>>
>>  What do you think about this ?
>>
>>  Thank you.
>>
>>
>>   [1]. https://github.com/apache/incubator-iceberg/pull/588
>>   [2]. https://github.com/apache/incubator-iceberg/issues/824
>>   [3]. https://github.com/apache/incubator-iceberg/issues/317
>>   [4].
>> https://docs.google.com/document/d/1CPFun2uG-eXdJggqKcPsTdNa2wPMpAdw8loeP-0fm_M/edit?usp=sharing
>>
>>
>
> --
> Best Regards
>


Re: Shall we start a regular community sync up?

2020-03-19 Thread Gautam
5 / 5:30pm any day of next week works for me.

On Thu, Mar 19, 2020 at 6:07 PM 李响  wrote:

> 5 or 5:30 PM (UTC-7, is it PDT now) in any day works for me.
> Looking forward to it 8-)
>
> On Fri, Mar 20, 2020 at 8:17 AM RD  wrote:
>
>> Same time works for me too!
>>
>> On Thu, Mar 19, 2020 at 4:45 PM Xabriel Collazo Mojica
>>  wrote:
>>
>>> 5pm or 5:30pm PT  any day next week would work for me.
>>>
>>> Thanks for restoring the community sync up!
>>>
>>> Xabriel J Collazo Mojica  |  Sr Computer Scientist II  |  Adobe
>>>
>>> On 3/18/20, 6:45 PM, "justin_cof...@apple.com on behalf of Justin Q
>>> Coffey" 
>>> wrote:
>>>
>>> Any chance we could actually do 5:30pm PST?  I'm a bit of a lurker,
>>> but this roadmap is important to mine and I have a daily at 5pm :(.
>>>
>>> -Justin
>>>
>>> > On Mar 18, 2020, at 6:43 PM, Saisai Shao 
>>> wrote:
>>> >
>>> > 5pm PST in any day works for me.
>>> >
>>> > Looking forward to it.
>>> >
>>> > Thanks
>>> > Saisai
>>>
>>>
>>>
>>>
>
> --
>
>李响 Xiang Li
>
> 手机 cellphone :+86-136-8113-8972
> 邮件 e-mail  :wate...@gmail.com
>


Re: Shall we start a regular community sync up?

2020-03-18 Thread Gautam
+1 for Monthly/fort-nightly and 5pm PST

What day are we thinking for next meeting?



On Wed, Mar 18, 2020 at 1:30 PM RD  wrote:

> +1
>
> On Wed, Mar 18, 2020 at 10:49 AM Ryan Blue 
> wrote:
>
>> No problem, we can alternate times to include everyone. How about the
>> next sync at 5 PM UTC+7 and then the one after that at a time that works
>> for people in UTC+0/+1?
>>
>> On Wed, Mar 18, 2020 at 10:21 AM Mass Dosage 
>> wrote:
>>
>>> We're in London so that wouldn't work for us but up to you obviously
>>> based on where most of the committers are.
>>>
>>> On Wed, 18 Mar 2020, 17:13 Ryan Blue,  wrote:
>>>
 Yes, I agree! What days work for everyone? Since most people are in
 UTC-7 and UTC+8, it probably makes sense to do something in the evening
 here in California, right?

 On Wed, Mar 18, 2020 at 10:06 AM Mass Dosage 
 wrote:

> +1 to monthly or fortnightly.
>
> On Wed, 18 Mar 2020 at 16:22, Miao Wang 
> wrote:
>
>> +1. Monthly or Bi-Weekly.
>>
>>
>>
>> *From: *OpenInx 
>> *Reply-To: *"dev@iceberg.apache.org" 
>> *Date: *Wednesday, March 18, 2020 at 8:20 AM
>> *To: *"dev@iceberg.apache.org" 
>> *Cc: *Ryan Blue 
>> *Subject: *Re: Shall we start a regular community sync up?
>>
>>
>>
>> +1
>>
>>
>>
>> On Wed, Mar 18, 2020 at 10:30 PM Saisai Shao 
>> wrote:
>>
>> Hi team,
>>
>>
>>
>> With more companies and developers joining in the community, I was
>> wondering if we could have regular sync up to discuss anything about
>> Iceberg, like milestone, feature design, etc. I think this will be quite
>> helpful to grow the community and move forward the project.
>>
>>
>>
>> Would like to hear your thoughts.
>>
>>
>>
>> Best regards,
>>
>> Saisai
>>
>>
>>
>>
>>
>>

 --
 Ryan Blue
 Software Engineer
 Netflix

>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: Welcome new committer and PPMC member Ratandeep Ratti

2020-02-17 Thread Gautam
Congratulations and thanks for your work.

On Sun, Feb 16, 2020 at 8:37 PM RD  wrote:

> Thanks everyone!
>
> -Best,
> R.
>
> On Sun, Feb 16, 2020 at 7:39 PM David Christle
>  wrote:
>
>> Congrats!!!
>>
>>
>>
>> *From: *Jacques Nadeau 
>> *Reply-To: *"dev@iceberg.apache.org" 
>> *Date: *Sunday, February 16, 2020 at 7:20 PM
>> *To: *Iceberg Dev List 
>> *Subject: *Re: Welcome new committer and PPMC member Ratandeep Ratti
>>
>>
>>
>> Congrats!
>>
>>
>>
>> On Sun, Feb 16, 2020, 7:06 PM xiaokun ding 
>> wrote:
>>
>> CONGRATULATIONS
>>
>>
>>
>> 李响  于2020年2月17日周一 上午11:05写道:
>>
>> CONGRATULATIONS!!!
>>
>>
>>
>> On Mon, Feb 17, 2020 at 9:50 AM Junjie Chen 
>> wrote:
>>
>> Congratulations!
>>
>>
>>
>> On Mon, Feb 17, 2020 at 5:48 AM Ryan Blue  wrote:
>>
>> Hi everyone,
>>
>>
>>
>> I'd like to congratulate Ratandeep Ratti, who was just invited to join
>> the Iceberg committers adn PPMC!
>>
>>
>>
>> Thanks for your contributions and reviews, Ratandeep!
>>
>>
>>
>> rb
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>>
>>
>>
>> --
>>
>> Best Regards
>>
>>
>>
>>
>> --
>>
>>
>>李响 Xiang Li
>>
>> 手机 cellphone :+86-136-8113-8972
>> 邮件 e-mail  :wate...@gmail.com
>>
>>


Re: Write reliability in Iceberg

2020-01-28 Thread Gautam
Thanks Ryan and Suds for the suggestions, we are looking into these
options.

We currently don't have any external catalog or locking service and depend
purely on commit retries. Additionally, we don't have any of our meta data
in Hive Metastore, and, we want to leverage the underlying filesystem to
read the table metadata, using the splitable nature of Iceberg's metadata.

I think to be able to keep split planning the way it's done today and
achieve consistency we need to be able to swap metadata consistently we
would need to be able to acquire / release lock (using ZK or otherwise) in
our CustomTableOperations's *doCommit* implementation.

Thanks for the guidance,
-Gautam.


On Tue, Jan 28, 2020 at 2:55 PM Ryan Blue  wrote:

> Thanks for pointing out those references, suds!
>
> And thanks to Mouli (for writing the doc) and Anton (for writing the test)!
>
> On Tue, Jan 28, 2020 at 2:05 PM suds  wrote:
>
>> We have referred https://iceberg.incubator.apache.org/custom-catalog/ and
>> implemented atomic operation using dynamo optimistic locking. Iceberg
>> codebase has has excellent test case to validate custom implementation.
>>
>> https://github.com/apache/incubator-iceberg/blob/master/hive/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java
>>
>>
>> On Tue, Jan 28, 2020 at 1:35 PM Ryan Blue 
>> wrote:
>>
>>> Hi Gautam,
>>>
>>> Hadoop tables are not intended to be used when the file system doesn't
>>> support atomic rename because of the problems you describe. Atomic rename
>>> is a requirement for correctness in Hadoop tables.
>>>
>>> That is why we also have metastore tables, where some other atomic swap
>>> is used. I strongly recommend using a metastore-based solution when your
>>> underlying file system doesn't support atomic rename, like the Hive
>>> catalog. We've also made it easy to plug in your own metastore using the
>>> `BaseMetastore` classes.
>>>
>>> That said, if you have an idea to make Hadoop tables better, I'm all for
>>> getting it in. But version hint file aside, without atomic rename, two
>>> committers could still conflict and cause one of the commits to be dropped
>>> because the second one to create any particular version's metadata file may
>>> succeed. I don't see a way around this.
>>>
>>> If you don't want to use a metastore, then you could rely on a write
>>> lock provided by ZooKeeper or something similar.
>>>
>>> On Tue, Jan 28, 2020 at 12:22 PM Gautam  wrote:
>>>
>>>> Hello Devs,
>>>>  We are currently working on building out a high
>>>> write throughput pipeline with Iceberg where hundreds or thousands of
>>>> writers (and thousands of readers) could be accessing a table at any given
>>>> moment. We are facing the issue called out by [1]. According to Iceberg's
>>>> spec on write reliability [2], the writers depend on an atomic swap, which
>>>> if fails should retry. While this may be true there can be instances where
>>>> the current write has the latest table state but still fails to perform the
>>>> swap or even worse the Reader sees an inconsistency while the write is
>>>> being made. To my understanding, this stems from the fact that the current
>>>> code [3] that does the swap assumes that the underlying filesystem provides
>>>> an atomic rename api ( like hdfs et al) to the version hint file which
>>>> keeps track of the current version. If the filesystem does not provide this
>>>> then it fails with a fatal error. I think Iceberg should provide some
>>>> resiliency here in committing the version once it knows that the latest
>>>> table state is still valid and more importantly ensure the readers never
>>>> fail during commit. If we agree I can work on adding this into Iceberg.
>>>>
>>>> How are folks handling write/read consistency cases where the
>>>> underlying fs doesn't provide atomic apis for file overwrite/rename?  We'v
>>>> outlined the details in the attached issue#758 [1] .. What do folks think?
>>>>
>>>> Cheers,
>>>> -Gautam.
>>>>
>>>> [1] - https://github.com/apache/incubator-iceberg/issues/758
>>>> [2] - https://iceberg.incubator.apache.org/reliability/
>>>> [3] -
>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java#L220
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Write reliability in Iceberg

2020-01-28 Thread Gautam
Hello Devs,
 We are currently working on building out a high write
throughput pipeline with Iceberg where hundreds or thousands of writers
(and thousands of readers) could be accessing a table at any given moment.
We are facing the issue called out by [1]. According to Iceberg's spec on
write reliability [2], the writers depend on an atomic swap, which if fails
should retry. While this may be true there can be instances where the
current write has the latest table state but still fails to perform the
swap or even worse the Reader sees an inconsistency while the write is
being made. To my understanding, this stems from the fact that the current
code [3] that does the swap assumes that the underlying filesystem provides
an atomic rename api ( like hdfs et al) to the version hint file which
keeps track of the current version. If the filesystem does not provide this
then it fails with a fatal error. I think Iceberg should provide some
resiliency here in committing the version once it knows that the latest
table state is still valid and more importantly ensure the readers never
fail during commit. If we agree I can work on adding this into Iceberg.

How are folks handling write/read consistency cases where the underlying fs
doesn't provide atomic apis for file overwrite/rename?  We'v outlined the
details in the attached issue#758 [1] .. What do folks think?

Cheers,
-Gautam.

[1] - https://github.com/apache/incubator-iceberg/issues/758
[2] - https://iceberg.incubator.apache.org/reliability/
[3] -
https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java#L220


Re: [DISCUSS] Forward compatibility and snapshot ID inheritance

2020-01-13 Thread Gautam
A feature flag sounds good to me with associated regression tests to pair
along with each feature.

Re: Snapshot Id Inheritance,  would be good to update the spec with the
change in metadata guarantees.

-Gautam.

On Mon, Jan 13, 2020 at 11:28 AM Ryan Blue 
wrote:

> Hi everyone,
>
> Anton has a PR almost ready to merge that implements snapshot ID
> inheritance, similar to how we plan to inherit sequence IDs in metadata.
> That allows people to create manifests that are missing data that will be
> assigned at commit time (snapshot ID) or that may change if a commit is
> retried (sequence number). The inherited information is stored as a field
> of ManifestFile that is stored in the ManifestList.
>
> This change makes the snapshot ID optional for each data file in a
> manifest, so that a null snapshot ID indicates that it should be inherited
> from the manifest metadata. This is a breaking change because older readers
> consider this field required. A change that can break older readers is not
> allowed because we guarantee forward compatibility within a format version.
>
> There are some options for how we handle this. First, we could bump the
> format version and break compatibility, but there are cases when it is
> possible to read tables that use appended manifests. For example, tables
> that don't use appended manifests, or tables that rewrite those manifests
> quickly will be compatible with old readers. That's why I think we should
> consider a second option: adding a feature flag that ensures that manifests
> will not be written with missing snapshot IDs unless the table has the
> compatibility flag set. Then tables are opted into breaking changes within
> a format version and we have a way to release format features before the
> version where they become standard; format v2 will mark the snapshot ID
> optional and have requirements for inheritance.
>
> What do people think about this strategy for managing breaking changes? I
> like the idea of getting the changes out early behind feature flags, where
> possible, but it would be great to hear whether other people see problems
> with this approach.
>
>
> rb
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Iceberg Vectorized Reads Meeting Notes (Nov 14)

2019-11-14 Thread Gautam
*Vectorization notes (Nov 14) *



Attendees:

   - Anjali
   - Samarth
   - Ryan
   - Gautam


Overall things covered:

   - Current state of performance
   - How to start getting things from vectorized-read branch into master
   - Next steps for complex types



Current performance:

   - Reads for dictionary encoded string columns including fallback to
   plain encoding is around 30% faster than vectorized spark reads
   - Other primitive types 5-7 % slower
   - Currently using Arrow version 14.1
  - Upgrading to this improved performance
  - Shade this version within iceberg so it doesn't conflict with
  Spark's dependency



*Things to do:*



   - Merge Reader and ArrowVectorizedReaders into one and handle enabling
   vectorization based on config and projection schema  (
   https://github.com/apache/incubator-iceberg/issues/520) - *Gautam*
   - Separate Glue code for Spark ColumnVector from Iceberg arrow (added
   new issue: https://github.com/apache/incubator-iceberg/issues/648)
- *Samarth/
   Anjali *?
   - Separate out iceberg arrow  code into it's own module (
   https://github.com/apache/incubator-iceberg/issues/522)
   - Unit tests for current work.



*Discussion: *


What are next steps?

*Ryan*:

   - Aim for vectorization work to make it into master. Work on separating
   out code into PRs to master
  - ColumnVector implementations
  - Breaking up Type-wise decode implementations
  - Separate out glue code for iceberg arrow and spark ColumnVector
   - Make sure Licensing  of code is honored (e.g. if code was copied from
   spark, attribute that contribution accordingly)

Question: Is smallest unit of task planning a row group?

*Ryan*: Yes, having said that, there's provision in spark to read partial
batches. Can use row counts in ColumnVector to express how much valid


Can we start on complex types?

*Ryan*: Yes, shouldn't be blocked on anything major. Can start with
top-level structs right now (struct with 1 level of nesting).



Added a new issue https://github.com/apache/incubator-iceberg/issues/648 ,
please add this to the milestone
https://github.com/apache/incubator-iceberg/milestone/2



Lemme know if there was anything I missed or misquoted.



Regards,

-Gautam.


Re: [ANNOUNCE] Apache Iceberg release 0.7.0-incubating

2019-10-31 Thread Gautam
Great first release milestone! Looking forward to more work going into this
community! Thanks to Ryan for shepherding the release and those who helped
verify it.

On Mon, Oct 28, 2019 at 10:48 PM Mouli Mukherjee 
wrote:

> Awesome! Congratulations!
>
> On Mon, Oct 28, 2019 at 9:17 AM Sandeep Sagar 
> wrote:
>
>> Awesome! Great work folks!
>>
>>
>>
>> *From: *Miao Wang 
>> *Reply-To: *"dev@iceberg.apache.org" 
>> *Date: *Monday, October 28, 2019 at 9:09 AM
>> *To: *"dev@iceberg.apache.org" 
>> *Subject: *Re: [ANNOUNCE] Apache Iceberg release 0.7.0-incubating
>>
>>
>>
>> Congratulations!
>>
>>
>>
>> Miao
>>
>>
>>
>> *From: *Xabriel Collazo Mojica 
>> *Reply-To: *"dev@iceberg.apache.org" 
>> *Date: *Monday, October 28, 2019 at 9:08 AM
>> *To: *"dev@iceberg.apache.org" 
>> *Subject: *Re: [ANNOUNCE] Apache Iceberg release 0.7.0-incubating
>>
>>
>>
>> This is great news! Thanks to all involved in development as well as
>> testing the release!
>>
>>
>>
>> *Xabriel J Collazo Mojica*  |  Sr Software Engineer  |  Adobe
>>
>>
>>
>> *From: *John Zhuge 
>> *Reply-To: *"dev@iceberg.apache.org" 
>> *Date: *Monday, October 28, 2019 at 8:40 AM
>> *To: *"dev@iceberg.apache.org" 
>> *Subject: *Re: [ANNOUNCE] Apache Iceberg release 0.7.0-incubating
>>
>>
>>
>> Congratulations !
>>
>>
>>
>> On Mon, Oct 28, 2019 at 6:37 AM Anton Okolnychyi
>>  wrote:
>>
>> This is great! Thanks everyone who contributed and to Ryan for preparing
>> the release!
>>
>>
>>
>> - Anton
>>
>>
>>
>> On 28 Oct 2019, at 13:30, Arina Yelchiyeva 
>> wrote:
>>
>>
>>
>> Congratulations with the first release!
>>
>>
>>
>>
>>
>> Kind regards,
>>
>> Arina
>>
>>
>>
>> On Oct 28, 2019, at 3:16 PM, Anjali Norwood 
>> wrote:
>>
>>
>>
>> Congratulations!!
>>
>>
>>
>> Regards
>>
>> Anjali
>>
>>
>>
>> On Sun, Oct 27, 2019 at 2:56 PM Ryan Blue  wrote:
>>
>> Here's the release announcement that I just sent out. Thanks to everyone
>> that contributed to this release!
>>
>> -- Forwarded message -
>> From: *Ryan Blue* 
>> Date: Sun, Oct 27, 2019 at 2:55 PM
>> Subject: [ANNOUNCE] Apache Iceberg release 0.7.0-incubating
>> To: 
>>
>>
>>
>> I'm pleased to announce the release of Apache Iceberg 0.7.0-incubating!
>>
>> Apache Iceberg is an open table format for huge analytic datasets.
>> Iceberg delivers high query performance for tables with tens of petabytes
>> of data, as well as with atomic commits with concurrent writers and
>> side-effect free schema evolution.
>>
>> The source release can be downloaded from:
>> https://www.apache.org/dyn/closer.cgi/incubator/iceberg/apache-iceberg-0.7.0-incubating/apache-iceberg-0.7.0-incubating.tar.gz
>> 
>>  –
>> signature
>> 
>> – sha512
>> 
>>
>>
>>
>> Java artifacts are available from Maven Central, including an all-in-one 
>> Spark
>> 2.4 runtime Jar
>> .
>> To use Iceberg in Spark 2.4, add the runtime Jar to the jars folder of your
>> Spark install.
>>
>>
>>
>> Additional information is available at
>> http://iceberg.apache.org/releases/
>> 
>>
>> Thanks to everyone that contributed to this release! This is the fi

Re: [VOTE] Release Apache Iceberg 0.7.0-incubating RC2

2019-10-17 Thread Gautam
verified checksums, keys, signatures and the build. Ran RAT checks.

+1 from me towards this release candidate being *Iceberg 0.7.0-incubating *

On Fri, Oct 18, 2019 at 7:17 AM 俊杰陈  wrote:

> Ran 7 steps from the previous email, all passed. +1 from me.
>
> On Fri, Oct 18, 2019 at 3:03 AM RD  wrote:
> >
> > +1
> > Validated all steps from previous email.
> >
> > -R
> >
> > On Thu, Oct 17, 2019 at 10:42 AM Julien Le Dem 
> > 
> wrote:
> >>
> >> +1
> >> I validated the signatures, checked the licences, ran the build
> >>
> >> (typo in the vote options. It's "+1 Release this as Apache *Iceberg*
> 0.7.0-incubating")
> >>
> >> On Tue, Oct 15, 2019 at 9:57 AM Ryan Blue  wrote:
> >>>
> >>> Hi everyone,
> >>>
> >>> I propose the following RC to be released as official Apache Iceberg
> 0.7.0-incubating release.
> >>>
> >>> The commit id is effdcdd9119a25ee0569fad667002119dce17b9e
> >>> * This corresponds to the tag: apache-iceberg-0.7.0-incubating-rc2
> >>> *
> https://github.com/apache/incubator-iceberg/tree/apache-iceberg-0.7.0-incubating-rc2
> >>> *
> https://github.com/apache/incubator-iceberg/tree/effdcdd9119a25ee0569fad667002119dce17b9e
> >>>
> >>> The release tarball, signature, and checksums are here:
> >>> *
> https://dist.apache.org/repos/dist/dev/incubator/iceberg/apache-iceberg-0.7.0-incubating-rc2/
> >>>
> >>> You can find the KEYS file here:
> >>> * https://dist.apache.org/repos/dist/dev/incubator/iceberg/KEYS
> >>>
> >>> Convenience binary artifacts are staged in Nexus. The Maven repository
> URL is:
> >>> *
> https://repository.apache.org/content/repositories/orgapacheiceberg-1002/
> >>>
> >>> This is the first Apache Iceberg release.
> >>>
> >>> Please download, verify, and test; then vote in the next 72 hours.
> >>>
> >>> [ ] +1 Release this as Apache Parquet 0.7.0-incubating
> >>> [ ] +0
> >>> [ ] -1 Do not release this because...
> >>>
> >>> --
> >>> Ryan Blue
>
>
>
> --
> Thanks & Best Regards
>


Re: [VOTE] Release Apache Iceberg 0.7.0-incubating RC1

2019-10-13 Thread Gautam
Ran all steps successfully.

 +1 from me.

On Mon, Oct 14, 2019 at 7:30 AM 俊杰陈  wrote:

> Ran all steps successfully, +1
>
> On Mon, Oct 14, 2019 at 7:39 AM Ted Gooch 
> wrote:
> >
> > Ran all steps no issues from me.
> > +1
> >
> > On Sun, Oct 13, 2019 at 12:09 PM Ryan Blue 
> wrote:
> >>
> >> +1 (binding)
> >>
> >> I went through all of the validation and it looks good.
> >>
> >> I also tested the iceberg-spark-runtime Jar with the Apache Spark 2.4.4
> download. Copying the runtime Jar into Spark's jars folder works without
> problems to read and write both path-based tables and Hive tables. Metadata
> tables work correctly, same with time travel, and metadata tables with time
> travel also work. I also didn't run out of threads in the test Hive
> metastore as I did with the last candidate.
> >>
> >> On Sun, Oct 13, 2019 at 11:30 AM Anton Okolnychyi <
> aokolnyc...@apple.com> wrote:
> >>>
> >>> +1 from me then
> >>>
> >>> On 13 Oct 2019, at 18:33, Ryan Blue  wrote:
> >>>
> >>> The publish steps will now sign all of the artifacts, which is
> required for an Apache release. That's why the publish steps fail in
> master. To fix this in master, we can come up with a way to only turn on
> release signatures if `-Prelease` is set, which is how we also select the
> Apache releases repository.
> >>>
> >>> I don't think this is a problem with the release. The convenience
> binaries in the release must be signed and published from an Apache
> repository, so this is necessary. If you're trying to use the release, then
> you don't need to be using JitPack.
> >>>
> >>> On Sun, Oct 13, 2019 at 6:53 AM Anton Okolnychyi
>  wrote:
> >>>>
> >>>> Verified signature/checksum/rat, run tests.
> >>>>
> >>>> No other pending questions except what Arina and Gautam brought up.
> >>>>
> >>>> - Anton
> >>>>
> >>>> On 13 Oct 2019, at 09:17, Gautam  wrote:
> >>>>
> >>>> I was able to run steps in Ryan's mail just fine but ran  into the
> same thing Arina mentioned  .. when running " ./graldew build publish "  ..
> >>>>
> >>>> A problem was found with the configuration of task
> ':iceberg-api:signApachePublication'.
> >>>> > No value has been specified for property 'signatory.keyId'.
> >>>>
> >>>>
> >>>> Something we are expected to do with the keys here?
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Sat, Oct 12, 2019 at 8:30 PM Arina Yelchiyeva <
> arina.yelchiy...@gmail.com> wrote:
> >>>>>
> >>>>> Not sure, if this is related to the release vote but after "Update
> build for Apache releases" commit [1], we are not longer able to build
> Iceberg using JitPack.
> >>>>> Error [2]:
> >>>>>
> >>>>> * What went wrong:
> >>>>> A problem was found with the configuration of task
> ':iceberg-api:signApachePublication'.
> >>>>> > No value has been specified for property 'signatory.keyId'.
> >>>>>
> >>>>>
> >>>>> [1]
> https://github.com/apache/incubator-iceberg/commit/2219c86ec6dc5512b2e581f500125841b1b56226
> >>>>> [2]
> https://jitpack.io/com/github/apache/incubator-iceberg/5620f119f4/build.log
> >>>>>
> >>>>> On Oct 12, 2019, at 6:08 AM, Julien Le Dem 
> wrote:
> >>>>>
> >>>>> I’m away for a long weekend without my laptop and will be able to
> try it out on Tuesday (if votes are still needed).
> >>>>> Happy validation everyone!
> >>>>> Julien
> >>>>>
> >>>>> On Oct 11, 2019, at 18:21, Ryan Blue  wrote:
> >>>>>
> >>>>> Here are the steps I included on the last thread if you'd like to
> validate the release:
> >>>>>
> >>>>> Download the source tarball, signature (.asc), and checksum (.sha512)
> >>>>> Import gpg keys: download KEYS and run gpg --import
> /path/to/downloaded/KEYS (optional if this hasn’t changed)
> >>>>> Verify the signature by running: gpg --verify
> apache-iceberg-0.7.0-incubating.tar.gz.asc
> >>>&

Re: [VOTE] Release Apache Iceberg 0.7.0-incubating RC1

2019-10-13 Thread Gautam
I was able to run steps in Ryan's mail just fine but ran  into the same
thing Arina mentioned  .. when running "* ./graldew build publish *"  ..

A problem was found with the configuration of task
':iceberg-api:signApachePublication'.
> No value has been specified for property 'signatory.keyId'.


Something we are expected to do with the keys here?






On Sat, Oct 12, 2019 at 8:30 PM Arina Yelchiyeva 
wrote:

> Not sure, if this is related to the release vote but after "Update build
> for Apache releases" commit [1], we are not longer able to build Iceberg
> using JitPack.
> Error [2]:
>
> * What went wrong:
> A problem was found with the configuration of task 
> ':iceberg-api:signApachePublication'.
> > No value has been specified for property 'signatory.keyId'.
>
>
> [1]
> https://github.com/apache/incubator-iceberg/commit/2219c86ec6dc5512b2e581f500125841b1b56226
> [2]
> https://jitpack.io/com/github/apache/incubator-iceberg/5620f119f4/build.log
>
>
> On Oct 12, 2019, at 6:08 AM, Julien Le Dem  wrote:
>
> I’m away for a long weekend without my laptop and will be able to try it
> out on Tuesday (if votes are still needed).
> Happy validation everyone!
> Julien
>
> On Oct 11, 2019, at 18:21, Ryan Blue  wrote:
>
> Here are the steps I included on the last thread if you'd like to validate
> the release:
>
>1. Download the source tarball
>
> 
>, signature
>
> 
>  (.asc),
>and checksum
>
> 
> (.sha512)
>2. Import gpg keys: download KEYS
> and
>run gpg --import /path/to/downloaded/KEYS (optional if this hasn’t
>changed)
>3. Verify the signature by running: gpg --verify
>apache-iceberg-0.7.0-incubating.tar.gz.asc
>4. Verify the checksum by running: sha512sum -c
>apache-iceberg-0.7.0-incubating.tar.gz.sha512
>5. Untar the archive and go into the source directory: tar xzf
>apache-iceberg-0.7.0-incubating.tar.gz && cd 
> apache-iceberg-0.7.0-incubating
>6. Run RAT checks to validate license headers: dev/check-license
>7. Build and test the project: ./gradlew build (use Java 8)
>
> You can also validate the LICENSE and NOTICE documentation, which is
> included in the source tarball, as well as the staged binary artifacts.
>
> To validate the convenience binaries, add the Maven URL from the email
> above to a downstream project and update your Iceberg dependency to
> 0.7.0-incubating, like this:
>
>   repositories {
> maven {
>   name 'stagedIceberg'
>   url 
> 'https://repository.apache.org/content/repositories/orgapacheiceberg-1000/'
> }
>   }
>
>   ext {
> icebergVersion = '0.7.0-incubating'
>   }
>
> Then run the downstream project’s tests.
>
> Thanks for voting, everyone!
>
> rb
>
> On Fri, Oct 11, 2019 at 6:18 PM Ryan Blue  wrote:
>
>> Hi everyone,
>>
>> I propose the following RC to be released as official Apache Iceberg
>> 0.7.0-incubating release.
>>
>> The commit id is 028a8d0e65d9c713b9b040c592fa10641b6c867b
>> * This corresponds to the tag: apache-iceberg-0.7.0-incubating-rc1
>> *
>> https://github.com/apache/incubator-iceberg/tree/apache-iceberg-0.7.0-incubating-rc1
>> *
>> https://github.com/apache/incubator-iceberg/tree/028a8d0e65d9c713b9b040c592fa10641b6c867b
>>
>> The release tarball, signature, and checksums are here:
>> *
>> https://dist.apache.org/repos/dist/dev/incubator/iceberg/apache-iceberg-0.7.0-incubating-rc1/
>>
>> You can find the KEYS file here:
>> * https://dist.apache.org/repos/dist/dev/incubator/iceberg/KEYS
>>
>> Convenience binary artifacts are staged in Nexus. The Maven repository
>> URL is:
>> *
>> https://repository.apache.org/content/repositories/orgapacheiceberg-1001/
>>
>> This is the first Apache Iceberg release.
>>
>> Please download, verify, and test; then vote in the next 72 hours.
>>
>> [ ] +1 Release this as Apache Parquet 0.7.0-incubating
>> [ ] +0
>> [ ] -1 Do not release this because...
>>
>> --
>> Ryan Blue
>>
>
>
> --
> Ryan Blue
>
>
>


Iceberg Vectorized Reads Meeting Notes (Oct 7)

2019-10-07 Thread Gautam
Hello Devs,
We met to discuss progress and next steps on Vectorized
read path in Iceberg. Here are my notes from the sync. Feel free to reply
with clarifications in case I mis-quoted or missed anything.

*Attendees*:

Anjali Norwood
Padma Pennumarthy
Ryan Blue
Samarth Jain
Gautam Kowshik

*Topics *
- Progress on Arrow Based Vectorization Reads
- Features being worked on and possible improvements
- Pending bottlenecks
- Identify things to collaborate on going forward.
- Next steps

Arrow Vectorized Reader

  Samarth/Anjali:

   - Working on Arrow based vectoization [1]
   - At  performance parity between Spark and Iceberg on primitive types
   except strings.
   - Planning to do dictionary encoding on strings
   - New Arrow version gives boost in performance and fixes issues
   - Vectorized batched Reading of definition levels improves performance
   - Some checks had to be turned off in arrow to push performance further
   viz. null check, unsafe memory access
   - Implemented prefetching of parquet pages, this improves perf on
   primitives beyond Vanilla spark


   Ryan:


   - Arrow version should not tied to spark and have iceberg specific
   implementation binding so it will work with any reader not just spark.
   - Add DatasourceV2Strategy to handle nested pruning into Spark upstream.
   Will coordinate with Apple folks to add their work into Spark.
   - Need ability to fallback  to row based reads for cases where columnar
   isn't possible. A config option maybe.
   - Can add options where columnar batches are read into InternalRow and
   returned to the Datasource.

  Padma:

   - Possibly contribute work on arrow back to arrow project. (can punt on
   this for now to move forward faster on current work)
   - Was looking into complex type support for Arrow based reads.


V1 Vectorized Read Path [2]

Gautam:

   - Been working on V1 vectorized short circuit read path [3]. (this is
   prolly not as useful once we have full featured support on Arrow based
   reads)
   - Will work on getting schema evolution parts working with this reader
   by getting Projection unit/integration tests working. (this can be
   contributed back into iceberg repo to unblock this path if we want to have
   that option till arrow based read is fully capable)



*Next steps:*

   - Unit tests for current Arrow based work.
   - Provide options to perform vectorized batch reads, Row oriented reads
   and Internal Row over Batch reads.
   - Separate Arrow work in Iceberg into it's own sub-module
   - Dictionary encoding support for strings in Arrow.
   - Complex type support for Arrow.
   - File issues for the above and identify how to distribute work between
   us.




[1]  https://github.com/apache/incubator-iceberg/tree/vectorized-read

[2]  https://github.com/apache/incubator-iceberg/pull/462

[3]
https://github.com/prodeezy/incubator-iceberg/commits/v1-vectorized-reader


Re: [DISCUSS] Iceberg community sync?

2019-10-07 Thread Gautam
+1  9 am PST on Tues/Wednesday works.

On Mon, Oct 7, 2019 at 4:50 AM Jacques Nadeau  wrote:

> Tuesdays work best for me.
>
> On Sun, Oct 6, 2019, 4:18 PM Anton Okolnychyi
>  wrote:
>
>> Tuesday/Wednesday/Thursday works fine for me. Anything up to 19:00 UTC /
>> 20:00 BST / 12:00 PDT is OK if 09:00 PDT is too early for someone.
>>
>> Thanks,
>> Anton
>>
>> On 4 Oct 2019, at 19:59, Ryan Blue  wrote:
>>
>> Sounds good. How about the first sync next week?
>>
>> Since Anton replied, let's do the first one at a time that's reasonable
>> for people in BST. How about 16:00 UTC / 17:00 BST / 09:00 PDT? I could
>> make that time Tuesday, Wednesday, or Thursday next week, the 8th, 9th, or
>> 10th.
>>
>> If there are people in CST that want to attend these, please say so and
>> we will find a time that works for the next one.
>>
>> On Fri, Oct 4, 2019 at 10:36 AM Xabriel Collazo Mojica <
>> xcoll...@adobe.com.invalid> wrote:
>>
>>> +1
>>>
>>>
>>>
>>> *Xabriel J Collazo Mojica*  |  Senior Software Engineer  |  Adobe  |
>>> xcoll...@adobe.com
>>>
>>>
>>>
>>> *From: * on behalf of Anton Okolnychyi <
>>> aokolnyc...@apple.com.INVALID>
>>> *Reply-To: *"dev@iceberg.apache.org" 
>>> *Date: *Friday, October 4, 2019 at 1:41 AM
>>> *To: *Iceberg Dev List 
>>> *Subject: *Re: [DISCUSS] Iceberg community sync?
>>>
>>>
>>>
>>> +1
>>>
>>>
>>>
>>> On 4 Oct 2019, at 07:14, Julien Le Dem 
>>> wrote:
>>>
>>>
>>>
>>> +1
>>>
>>>
>>>
>>> On Thu, Oct 3, 2019 at 17:52 Xinli shang 
>>> wrote:
>>>
>>> Good to me for once a month!
>>>
>>>
>>>
>>> On Thu, Oct 3, 2019 at 5:13 PM Jacques Nadeau 
>>> wrote:
>>>
>>> Sounds good to me. I'd vote for once a month.
>>>
>>> --
>>>
>>> Jacques Nadeau
>>>
>>> CTO and Co-Founder, Dremio
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Oct 3, 2019 at 4:56 PM Ryan Blue 
>>> wrote:
>>>
>>> Hi everyone,
>>>
>>>
>>>
>>> Other projects I'm involved in use a hangouts meetup every few weeks to
>>> sync up about the status of different ongoing projects. Iceberg is getting
>>> to the point where we might want to consider doing this as well. There are
>>> some significant efforts, like vectorization, row-level delete, and our
>>> first release.
>>>
>>>
>>>
>>> Usually how this works is we talk over hangouts or some other video call
>>> platform. Also, someone takes notes and sends those notes to the dev list
>>> to keep a record of what we discussed for anyone that couldn't attend.
>>>
>>>
>>>
>>> Does this sound like a good idea to anyone?
>>>
>>>
>>>
>>> If so, how often? I know we have people in different time zones, so
>>> depending on who wants to attend, we may need to alternate times.
>>>
>>>
>>>
>>> rb
>>>
>>>
>>>
>>> --
>>>
>>> Ryan Blue
>>>
>>> Software Engineer
>>>
>>> Netflix
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Xinli Shang
>>>
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>>
>>


Re: Incompatible Writes due to OutOfOrder Fields

2019-09-26 Thread Gautam
Shone and I synced offline but wanted to circle back here so others can
hopefully benefit and others with more experience with this can correct me
if there's a better way to achieve this.

*Problem*:
  The use case  is that incoming data has fields out of order w.r.t already
ingested data in Iceberg. This same scenario applies to nested columns as
well (e.g. fields in a sub-struct has fields out of order) . Also Incoming
data might have added fields. Issue is if data is ingested as is  Iceberg
will complain with it's compatibility checks. As it should.

*Solution*:
  Iceberg doesn't depend on field names nor natural order of fields. It
uses Ids to keep track of schema fields. So if one wants to
enforce evolution rules correctly she should first go back to the
underlying Iceberg schema and apply schema transformation rules using
Iceberg Schema Update Api and commit the schema changes to the underlying
table. Once this is done Iceberg will have created a new version of the
schema with new Ids allotted to the added fields. It also accounts for
different order in the incoming data as it keeps the id-name mapping for
all columns.

Here is a gist that captures these scenarios described above with sample
data : https://gist.github.com/prodeezy/b2cc35b87fca7d43ae681d45b3d7cab3

Cheers,
-Gautam.







On Wed, Sep 25, 2019 at 5:29 AM Ryan Blue  wrote:

> Hi Shone,
>
> Iceberg should be able to handle out of order data columns in nested
> structures. We probably just need to relax that compatibility check to
> allow it. Can you post the error message that you're getting?
>
> On Sun, Sep 22, 2019 at 4:49 AM Shone Sadler 
> wrote:
>
>> Hello everyone,
>>
>> This question is related to schema evolution support in Iceberg.
>>
>> We have data coming in with fields out-of-order wrt to the schema in
>> Iceberg (e.g. inbound struct(a,b,c) vs. iceberg struct(c,b,a))
>>
>> As a result we are hitting the following error in Iceberg when saving the
>> data  -> "Cannot write incompatible dataset to table with schema",
>> generated within the IcebergeSource ->
>> https://github.com/apache/incubator-iceberg/blob/d1f0b540f5f14f002be86133ef9f66445f7e0926/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java#L157
>>
>> I also noted in the documentation that re-ordering was allowed ->
>> https://iceberg.apache.org/evolution/ , which led me to believe that we
>> could update the schema prior to writing the data, However, I see no means
>> of re-ordering fields on the current UpdateSchema API.
>>
>> How are people handling out-of-order fields today?
>>
>> Our data is deeply nested, as a result I am hoping not to have to
>> transform/prep on ingest and looking for alternatives.
>>
>> Any thoughts appreciated!
>>
>> Regards,
>> Shone Sadler
>>
>>
>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Iceberg using V1 Vectorized Reader over Parquet ..

2019-09-05 Thread Gautam
I'v added unit tests and created a PR for the v1 vectorization work :
https://github.com/apache/incubator-iceberg/pull/452

I'm sure there's scope for further improvement so lemme know your feedback
over the PR so I can sharpen it further.

Cheers,
-Gautam.


On Wed, Sep 4, 2019 at 10:33 PM Mouli Mukherjee 
wrote:

> Hi Gautam, this is very exciting to see. It would be great if this was
> available behind a flag if possible.
>
> Best,
> Mouli
>
> On Wed, Sep 4, 2019, 7:01 AM Gautam  wrote:
>
>> Hello Devs,
>>As some of you know there's been ongoing work as part
>> of [1] to build Arrow based vectorization into Iceberg. There's a separate
>> thread on this dev list where that is being discussed and progress is being
>> tracked in a separate branch [2]. The overall approach there is to build a
>> ground up Arrow based implementation of vectorization within Iceberg so
>> that any compute engine using Iceberg would benefit from those
>> optimizations. We feel that is indeed the longer term solution and the best
>> way forward.
>>
>> Meanwhile, Xabriel & I took to investigating an interim approach where
>> Iceberg could use the current Vectorization code built into Spark Parquet
>> reading, which I will refer to as "*V1 Vectorization*". This is the code
>> path that Spark's DataSourceV1 readers use to read Parquet data. The result
>> is that we have performance parity between Iceberg and Spark's Vanilla
>> Parquet reader. We thought we should share this with the larger community
>> so others can benefit from this gain.
>>
>> *What we did *:
>> - Added a new reader viz. *V1VectorizedReader *that internally short
>> circuits to using the V1 codepath [3]  which does most of the setup and
>> work to perform vectorization. it's exactly what Vanilla Spark's reader
>> does underneath the DSV1 implementation.
>> - It builds an iterator which expects ColumnarBatches from the Objects
>> returned by the resolving iterator.
>> - We re-organized and optimized code while building *ReadTask *instances 
>> which
>> considerably improved task initiation and planning time.
>> - Setting `*iceberg.read.enableV1VectorizedReader*` to true enables this
>> reader in IcebergSource.
>> - The V1Vectorized reader is an independent class with copied code in
>> some methods as we didn't want to degrade perf due to inheritance/virtual
>> method calls (we noticed degradation when we did try to re-use code).
>> - I'v pushed this code to a separate branch [4] in case others want to
>> give this a try.
>>
>>
>> *The Numbers*:
>>
>>
>> Flat Data 10 files 10M rows each
>>
>>
>> Benchmark
>> Mode  Cnt   Score   Error  Units
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>>   ss5  63.631 ± 1.300   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>>   ss5  28.322 ± 2.400   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>>   ss5  65.862 ± 2.480   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
>>   ss5  28.199 ± 1.255   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
>>   ss5  29.822 ± 2.848   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
>> ss5  27.953 ± 0.949   s/op
>>
>>
>>
>>
>> Flat Data Projections 10 files 10M rows each
>>
>>
>> Benchmark
>> Mode  Cnt   Score   Error  Units
>>
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>>   ss5  11.307 ± 1.791   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.
>> *readWithProjectionFileSourceVectorized*   ss5   3.480 ± 0.087
>> s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>>   ss5  11.057 ± 0.236   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
>> ss5   3.953 ± 1.592   s/op
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
>> ss5   3.619 ± 1.305   s/op
>>
>>
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
>> ss5   4.109 ± 1.734   s/op
>>
>>
>> Filtered Data 500 files 10k rows each

Iceberg using V1 Vectorized Reader over Parquet ..

2019-09-04 Thread Gautam
ization. This should help with Iceberg
adoption.
- We think this can be an interim solution (until Arrow based impl is fully
performant) for those who are currently blocked by performance difference
between Iceberg and Spark's native Vectorization for interactive usecases.
There's a lot of optimization work and testing gone into V1 vectorization
that Iceberg can now benefit from.
- In many cases companies have proprietary implementations of
*ParquetFileFormat* that could have extended features like complex type
support etc. Our code can use that at runtime as long as '
*buildReaderWithPartitionValues()*'  signature is consistent.. if not the
reader can be easily modified to plug their own vectorized reader in.
- While profiling the Arrow implementation I found it difficult to compare
bottlenecks due to major differences between DSv1 and DSv2 client-to-source
interface paths. This makes it easier to compare numbers and profile code
between V1 vectorization and Arrow vectorization as we now have both paths
working behind a single DataSourceV2 path (viz. IcebergSource).

*Limitations*:
- This implementation is specific to Spark so other compute frameworks like
Presto won't benefit from this.
- It doesn't use Iceberg's Value Reader interface as it bypasses everything
under the Task Data Reading. (added a separate *V1VectorizedTaskDataReader*)
- Need to maintain two readers, as adding any code to Reader.java might
need changes to V1Vectorized Reader. Although, we could minimize this with
a *ReaderUtils* class.


I have the code checked in at
https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
If folks think this is useful and we can keep this as an interim solution
behind a feature flag, I can get a PR up with proper unit tests.

thanks and regards,
-Gautam.


[1] - https://github.com/apache/incubator-iceberg/issues/9
[2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
[3] -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
[4] -
https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader


Re: New committer and PPMC member, Anton Okolnychyi

2019-09-02 Thread Gautam
Way to go Anton! Appreciate all the work and guidance.

On Tue, Sep 3, 2019 at 9:33 AM John Zhuge  wrote:

> Congratulations Anton!
>
> On Mon, Sep 2, 2019 at 8:45 PM Mouli Mukherjee 
> wrote:
>
>> Congratulations Anton!
>>
>> On Mon, Sep 2, 2019, 8:38 PM Saisai Shao  wrote:
>>
>>> Congrats Anton!
>>>
>>> Best regards,
>>> Saisai
>>>
>>> Daniel Weeks  于2019年9月3日周二 上午7:48写道:
>>>
 Congrats Anton!

 On Fri, Aug 30, 2019 at 1:54 PM Edgar Rodriguez
  wrote:

> Nice! Congratulations, Anton!
>
> Cheers,
>
> On Fri, Aug 30, 2019 at 1:42 PM Dongjoon Hyun 
> wrote:
>
>> Congratulations, Anton! :D
>>
>> Bests,
>> Dongjoon.
>>
>> On Fri, Aug 30, 2019 at 10:06 AM Ryan Blue  wrote:
>>
>>> I'd like to congratulate Anton Okolnychyi, who was just invited to
>>> join the Iceberg committers and PPMC!
>>>
>>> Thanks for all your contributions, Anton!
>>>
>>> rb
>>>
>>> --
>>> Ryan Blue
>>>
>>
>
> --
> Edgar Rodriguez
>

>
> --
> John Zhuge
>


Re: Nested Column Pruning in Iceberg (DSV2) ..

2019-08-30 Thread Gautam Kowshik
Super! That’d be great. Lemme know if I can help in any way. 

Sent from my iPhone

> On Aug 30, 2019, at 6:30 PM, Anton Okolnychyi  
> wrote:
> 
> Hi Gautam,
> 
> Iceberg does support nested schema pruning but Spark doesn’t request this for 
> DS V2 in 2.4. Internally, we had to modify Spark 2.4 to make this work 
> end-to-end.
> One of the options is to extend DataSourceV2Strategy with logic similar to 
> what we have in ParquetSchemaPruning in 2.4.0. I think we can share that part 
> if needed.
> 
> I am planning to check whether Spark master already has this functionality.
> If that’s not implemented and nobody is working on it yet, I can fix it.
> 
> - Anton
> 
> 
>> On 30 Aug 2019, at 15:42, Gautam  wrote:
>> 
>> Hello Devs,
>>I was measuring perf on structs between V1 and V2 
>> datasources. Found that although Iceberg Reader supports 
>> `SupportsPushDownRequiredColumns` it doesn't seem to prune nested column 
>> projections. I want to be able to prune on nested fields. How does V2 
>> datasource have provision to be able to let Iceberg decide this? The 
>> `SupportsPushDownRequiredColumns` mix-in gives the entire struct field even 
>> if a sub-field is requested.
>> 
>> Here's an illustration .. 
>> 
>> scala> spark.sql("select location.lat from iceberg_people_struct").show()
>> +---+
>> |lat|
>> +---+
>> |   null|
>> |101.123|
>> |175.926|
>> +---+
>> 
>> 
>> The pruning gets the entire struct instead of just `location.lat`  ..
>> 
>> public void pruneColumns(StructType newRequestedSchema) 
>> 
>> 19/08/30 16:25:38 WARN Reader: => Prune columns : {
>>  "type" : "struct",
>>  "fields" : [ {
>>"name" : "location",
>>"type" : {
>>  "type" : "struct",
>>  "fields" : [ {
>>"name" : "lat",
>>"type" : "double",
>>"nullable" : true,
>>"metadata" : { }
>>  }, {
>>"name" : "lon",
>>"type" : "double",
>>"nullable" : true,
>>"metadata" : { }
>>  } ]
>>},
>>"nullable" : true,
>>"metadata" : { }
>>  } ]
>> }
>> 
>> Is there information I can use in the IcebergSource (or add some) that can 
>> be used to prune the exact sub-field here?  What's a good way to approach 
>> this? For dense/wide struct fields this affects performance significantly.
>> 
>> 
>> Sample gist: 
>> https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac
>> 
>> 
>> thanks and regards,
>> -Gautam.
> 


Nested Column Pruning in Iceberg (DSV2) ..

2019-08-30 Thread Gautam
Hello Devs,
I was measuring perf on structs between V1 and V2
datasources. Found that although Iceberg Reader supports
`SupportsPushDownRequiredColumns` it doesn't seem to prune nested column
projections. I want to be able to prune on nested fields. How does V2
datasource have provision to be able to let Iceberg decide this? The
`SupportsPushDownRequiredColumns` mix-in gives the entire struct field even
if a sub-field is requested.

*Here's an illustration .. *

scala> spark.sql("select location.lat from iceberg_people_struct").show()
+---+
|lat|
+---+
|   null|
|101.123|
|175.926|
+---+


The pruning gets the entire struct instead of just `location.lat`  ..

*public void pruneColumns(StructType newRequestedSchema) *

19/08/30 16:25:38 WARN Reader: => Prune columns : {
  "type" : "struct",
  "fields" : [ {
"name" : "location",
"type" : {
  "type" : "struct",
  "fields" : [ {
"name" : "lat",
"type" : "double",
"nullable" : true,
"metadata" : { }
  }, {
"name" : "lon",
"type" : "double",
"nullable" : true,
"metadata" : { }
  } ]
},
"nullable" : true,
"metadata" : { }
  } ]
}

Is there information I can use in the IcebergSource (or add some) that can
be used to prune the exact sub-field here?  What's a good way to approach
this? For dense/wide struct fields this affects performance significantly.


Sample gist:
https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac


thanks and regards,
-Gautam.


Re: Encouraging performance results for Vectorized Iceberg code

2019-08-08 Thread Gautam Kowshik
Thanks Anjali and Samarth, 
   These look good! Great progress.  Can you push your changes to the 
vectorized-read branch please?

Sent from my iPhone

> On Aug 8, 2019, at 11:56 AM, Anjali Norwood  wrote:
> 
> Good suggestion Ryan. Added dev@iceberg now.
> 
> Dev: Please see early vectorized Iceberg performance results a couple emails 
> down. This WIP.
> 
> thanks, 
> Anjali.
> 
>> On Thu, Aug 8, 2019 at 10:39 AM Ryan Blue  wrote:
>> Hi everyone,
>> 
>> Is it possible to copy the Iceberg dev list when sending these emails? There 
>> are other people in the community that are interested, like Palantir. If 
>> there isn't anything sensitive then let's try to be more inclusive. Thanks!
>> 
>> rb
>> 
>>> On Wed, Aug 7, 2019 at 10:34 PM Anjali Norwood  wrote:
>>> Hi Gautam, Padma,
>>> We wanted to update you before Gautam takes off for vacation. 
>>> 
>>> Samarth and I profiled the code and found the following:
>>> Profiling the IcebergSourceFlatParquetDataReadBenchmark (10 files, 10M 
>>> rows, a single long column) using visualVM shows two places where CPU time 
>>> can be optimized:
>>> 1) Iterator abstractions (triple iterators, page iterators etc) seem to 
>>> take up quite a bit of time. Not using these iterators or making them 
>>> 'batched' iterators and moving the reading of the data close to the file 
>>> should help ameliorate this problem.
>>> 2) Current code goes back and forth between definition levels and value 
>>> reads through the levels of iterators. Quite a bit of CPU time is spent 
>>> here. Reading a batch of primitive values at once after consulting the 
>>> definition level should help improve performance.
>>> 
>>> So, we prototyped the code to walk over the definition levels and read 
>>> corresponding values in batches (read values till we hit a null, then read 
>>> nulls till we hit values and so on) and made the iterators batched 
>>> iterators. Here are the results:
>>> 
>>> Benchmark  Mode 
>>>  Cnt   Score   Error  Units
>>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorizedss 
>>>5  10.247 ± 0.202   s/op
>>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized   ss 
>>>5   3.747 ± 0.206   s/op
>>> IcebergSourceFlatParquetDataReadBenchmark.readIceberg   
>>>ss 5  11.286 ± 0.457   s/op
>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized100k  ss 
>>>5   6.088 ± 0.324   s/op
>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized10k   ss 
>>>5   5.875 ± 0.378   s/op
>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized1kss 
>>>5   6.029 ± 0.387   s/op
>>> IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5kss 
>>>5   6.106 ± 0.497   s/op
>>> 
>>> Moreover, as I mentioned to Gautam on chat, we prototyped reading the 
>>> string column as a byte array without decoding it into UTF8 (above changes 
>>> were not made at the time) and we saw significant performance improvements 
>>> there (21.18 secs before Vs 13.031 secs with the change). When used along 
>>> with batched iterators, these numbers should get better.
>>> 
>>> Note that we haven't tightened/profiled the new code yet (we will start on 
>>> that next). Just wanted to share some early positive results. 
>>> 
>>> regards, 
>>> Anjali.
>>> 
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix


Re: Approaching Vectorized Reading in Iceberg ..

2019-07-31 Thread Gautam
Ah yes, I didn't send over the filter benchmarks ..

Num files : 500
Num rows per file: 10,000

*Benchmark
 Mode  Cnt  Score   Error  Units*
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
   ss5  3.837 ± 0.424   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
  ss5  3.964 ± 1.891   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIceberg
 ss5  0.272 ± 0.039   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergVect100k
  ss5  0.274 ± 0.013   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergVect10k
 ss5  0.275 ± 0.040   s/op
IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergVect5k
  ss5  0.273 ± 0.031   s/op

On Wed, Jul 31, 2019 at 2:35 PM Anjali Norwood 
wrote:

> Hi Gautam,
>
> You wrote: ' - The filters are not being applied in columnar fashion they
> are being applied row by row as in Iceberg each filter visitor is stateless
> and applied separately on each row's column. ' .. this should not be a
> problem for this particular benchmark as IcebergSourceFlatParquetDataRe
> adBenchmark does not apply filters.
>
> -Anjali.
>
> On Wed, Jul 31, 2019 at 1:44 PM Gautam  wrote:
>
>> Hey Samarth,
>>   Sorry bout the delay. I ran into some bottlenecks for which
>> I had to add more code to be able to run benchmarks. I'v checked in my
>> latest changes to my fork's *vectorized-read* branch [0].
>>
>> Here's the early numbers on the initial implementation...
>>
>> *Benchmark Data:*
>> - 10 files
>> - 9MB each
>> - 1Millon rows (1 RowGroup)
>>
>> Ran benchmark using the jmh benchmark tool within 
>> incubator-iceberg/spark/src/jmh
>> using batch different sizes and compared it to  spark's vectorization and
>> non-vectorized reader.
>>
>> *Command: *
>> ./gradlew clean   :iceberg-spark:jmh
>>  -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
>>
>>
>>
>> *Benchmark
>>  Mode  Cnt   Score   Error  Units*
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>>  ss5  16.172 ± 0.750   s/op
>> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
>>   ss5   6.430 ± 0.136   s/op
>> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>>  ss5  15.287 ± 0.212   s/op
>>
>>
>>
>> *IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized100k
>>ss5  18.310 ± 0.498
>> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized10k
>> ss5  18.020 ± 0.378
>> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
>>ss5  17.769 ± 0.412   
>> s/op*IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>>ss5   2.794 ± 0.141   s/op
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceVectorized
>>   ss5   1.063 ± 0.140   s/op
>> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>>  ss5   2.966 ± 0.133   s/op
>>
>>
>> *IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized100k
>>  ss5   2.015 ± 0.261
>> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized10k
>>   ss5   1.972 ± 0.105
>> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized5k
>>ss5   2.065 ± 0.079   s/op*
>>
>>
>>
>> So seems like there's no improvement  that vectorization is adding over
>> the non-vectorized reading. I'm currently trying to profile where the time
>> is being spent.
>>
>> *Here is my initial speculation of why this is slow:*
>>  - There's too much overhead that seems to be from creating the batches.
>> i'm creating new instance of ColumnarBatch on each read  [1] . This should
>> prolly be re-used.
>>  - Although I am reusing the *FieldVector* across batched reads [2] I
>> wrap them in new *ArrowColumnVector*s [3]  on each read call. I didn't
>> think this would be a big deal but maybe it is.
>>  - The filters are not being applied in columnar fashion they are being
>> applied row by row as in Iceberg each filter visitor is stateless and
>> applied separately on each row's column.
>>  - I'm trying to re-use the BufferAllocato

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-31 Thread Gautam
Also I think the other thing that's fundamentally different is the way Page
iteration and Column iteration are done in Iceberg vs. the way value
reading happens in Spark's ValuesReader implementations.

On Wed, Jul 31, 2019 at 1:44 PM Gautam  wrote:

> Hey Samarth,
>   Sorry bout the delay. I ran into some bottlenecks for which
> I had to add more code to be able to run benchmarks. I'v checked in my
> latest changes to my fork's *vectorized-read* branch [0].
>
> Here's the early numbers on the initial implementation...
>
> *Benchmark Data:*
> - 10 files
> - 9MB each
> - 1Millon rows (1 RowGroup)
>
> Ran benchmark using the jmh benchmark tool within 
> incubator-iceberg/spark/src/jmh
> using batch different sizes and compared it to  spark's vectorization and
> non-vectorized reader.
>
> *Command: *
> ./gradlew clean   :iceberg-spark:jmh
>  -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
>
>
>
> *Benchmark
>Mode  Cnt   Score   Error  Units*
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
>  ss5  16.172 ± 0.750   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
> ss5   6.430 ± 0.136   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readIceberg
>  ss5  15.287 ± 0.212   s/op
>
>
>
> *IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized100k
>  ss5  18.310 ± 0.498
> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized10k
> ss5  18.020 ± 0.378
> s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
>ss5  17.769 ± 0.412   
> s/op*IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
>ss5   2.794 ± 0.141   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceVectorized
>   ss5   1.063 ± 0.140   s/op
> IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
>  ss5   2.966 ± 0.133   s/op
>
>
> *IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized100k
>  ss5   2.015 ± 0.261
> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized10k
>   ss5   1.972 ± 0.105
> s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized5k
>ss5   2.065 ± 0.079   s/op*
>
>
>
> So seems like there's no improvement  that vectorization is adding over
> the non-vectorized reading. I'm currently trying to profile where the time
> is being spent.
>
> *Here is my initial speculation of why this is slow:*
>  - There's too much overhead that seems to be from creating the batches.
> i'm creating new instance of ColumnarBatch on each read  [1] . This should
> prolly be re-used.
>  - Although I am reusing the *FieldVector* across batched reads [2] I
> wrap them in new *ArrowColumnVector*s [3]  on each read call. I didn't
> think this would be a big deal but maybe it is.
>  - The filters are not being applied in columnar fashion they are being
> applied row by row as in Iceberg each filter visitor is stateless and
> applied separately on each row's column.
>  - I'm trying to re-use the BufferAllocator that Arrow provides [4] ..
> Dunno if there are other strategies to using this. Will look more into this.
>  - I'm batching until the rowgroup ends and restricting the last batch to
> the Rowgroup boundary. I should prolly spill over to the next rowgroup to
> fill that batch. Dunno if this would help as from what i can tell I don't
> think *VectorizedParquetRecordReader *does this.
>
> I'l try and provide more insights once i improve my code. But if there's
> other insights folks have on where we can improve on things, i'd gladly try
> them.
>
> Cheers,
> - Gautam.
>
> [0] - https://github.com/prodeezy/incubator-iceberg/tree/vectorized-read
> [1] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L655
> [2] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java#L108
> [3] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L651
> [4] -
> https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-31 Thread Gautam
Hey Samarth,
  Sorry bout the delay. I ran into some bottlenecks for which I
had to add more code to be able to run benchmarks. I'v checked in my latest
changes to my fork's *vectorized-read* branch [0].

Here's the early numbers on the initial implementation...

*Benchmark Data:*
- 10 files
- 9MB each
- 1Millon rows (1 RowGroup)

Ran benchmark using the jmh benchmark tool within
incubator-iceberg/spark/src/jmh
using batch different sizes and compared it to  spark's vectorization and
non-vectorized reader.

*Command: *
./gradlew clean   :iceberg-spark:jmh
 -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt



*Benchmark
   Mode  Cnt   Score   Error  Units*
IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
   ss5  16.172 ± 0.750   s/op
IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
ss5   6.430 ± 0.136   s/op
IcebergSourceFlatParquetDataReadBenchmark.readIceberg
   ss5  15.287 ± 0.212   s/op



*IcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized100k
 ss5  18.310 ± 0.498
s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized10k
ss5  18.020 ± 0.378
s/opIcebergSourceFlatParquetDataReadBenchmark.readIcebergVectorized5k
   ss5  17.769 ± 0.412
s/op*IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
   ss5   2.794 ± 0.141   s/op
IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceVectorized
  ss5   1.063 ± 0.140   s/op
IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
   ss5   2.966 ± 0.133   s/op


*IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized100k
 ss5   2.015 ± 0.261
s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized10k
  ss5   1.972 ± 0.105
s/opIcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergVectorized5k
   ss5   2.065 ± 0.079   s/op*



So seems like there's no improvement  that vectorization is adding over the
non-vectorized reading. I'm currently trying to profile where the time is
being spent.

*Here is my initial speculation of why this is slow:*
 - There's too much overhead that seems to be from creating the batches.
i'm creating new instance of ColumnarBatch on each read  [1] . This should
prolly be re-used.
 - Although I am reusing the *FieldVector* across batched reads [2] I wrap
them in new *ArrowColumnVector*s [3]  on each read call. I didn't think
this would be a big deal but maybe it is.
 - The filters are not being applied in columnar fashion they are being
applied row by row as in Iceberg each filter visitor is stateless and
applied separately on each row's column.
 - I'm trying to re-use the BufferAllocator that Arrow provides [4] ..
Dunno if there are other strategies to using this. Will look more into this.
 - I'm batching until the rowgroup ends and restricting the last batch to
the Rowgroup boundary. I should prolly spill over to the next rowgroup to
fill that batch. Dunno if this would help as from what i can tell I don't
think *VectorizedParquetRecordReader *does this.

I'l try and provide more insights once i improve my code. But if there's
other insights folks have on where we can improve on things, i'd gladly try
them.

Cheers,
- Gautam.

[0] - https://github.com/prodeezy/incubator-iceberg/tree/vectorized-read
[1] -
https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L655
[2] -
https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java#L108
[3] -
https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java#L651
[4] -
https://github.com/prodeezy/incubator-iceberg/blob/vectorized-read/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java#L92


On Tue, Jul 30, 2019 at 5:13 PM Samarth Jain  wrote:

> Hey Gautam,
>
> Wanted to check back with you and see if you had any success running the
> benchmark and if you have any numbers to share.
>
>
>
> On Fri, Jul 26, 2019 at 4:34 PM Gautam  wrote:
>
>> Got it. Commented out that module and it works. Was just curious why it
>> doesn't work on master branch either.
>>
>> On Fri, Jul 26, 2019 at 3:49 PM Daniel Weeks  wrote:
>>
>>> Actually, it looks like the issue is right there in the error . . . the
>>> ErrorProne module is being excluded from the compile stages of the
>>> sub-projects here:
>>> https://gith

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-26 Thread Gautam
Got it. Commented out that module and it works. Was just curious why it
doesn't work on master branch either.

On Fri, Jul 26, 2019 at 3:49 PM Daniel Weeks  wrote:

> Actually, it looks like the issue is right there in the error . . . the
> ErrorProne module is being excluded from the compile stages of the
> sub-projects here:
> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L152
>
> However, it is still being applied to the jmh tasks.  I'm not familiar
> with this module, but you can run the benchmarks by commenting it out here:
> https://github.com/apache/incubator-iceberg/blob/master/build.gradle#L167
>
> We'll need to fix the build to disable for the jmh tasks.
>
> -Dan
>
> On Fri, Jul 26, 2019 at 3:34 PM Daniel Weeks  wrote:
>
>> Gautam, you need to have the jmh-core libraries available to run.  I
>> validated that PR, so I'm guessing I had it configured in my environment.
>>
>> I assume there's a way to make that available within gradle, so I'll take
>> a look.
>>
>> On Fri, Jul 26, 2019 at 2:52 PM Gautam  wrote:
>>
>>> This fails on master too btw. Just wondering if i'm doing
>>> something wrong trying to run this.
>>>
>>> On Fri, Jul 26, 2019 at 2:24 PM Gautam  wrote:
>>>
>>>> I'v been trying to run the jmh benchmarks bundled within the project.
>>>> I'v been running into issues with that .. have other hit this? Am I running
>>>> these incorrectly?
>>>>
>>>>
>>>> bash-3.2$ ./gradlew :iceberg-spark:jmh
>>>> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
>>>> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
>>>> ..
>>>> ...
>>>> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
>>>> error: plug-in not found: ErrorProne
>>>>
>>>> FAILURE: Build failed with an exception.
>>>>
>>>>
>>>>
>>>> Is there a config/plugin I need to add to build.gradle?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue  wrote:
>>>>
>>>>> Thanks Gautam!
>>>>>
>>>>> We'll start taking a look at your code. What do you think about
>>>>> creating a branch in the Iceberg repository where we can work on improving
>>>>> it together, before merging it into master?
>>>>>
>>>>> Also, you mentioned performance comparisons. Do you have any early
>>>>> results to share?
>>>>>
>>>>> rb
>>>>>
>>>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam 
>>>>> wrote:
>>>>>
>>>>>> Hello Folks,
>>>>>>
>>>>>> I have checked in a WIP branch [1] with a working version of
>>>>>> Vectorized reads for Iceberg reader. Here's the diff  [2].
>>>>>>
>>>>>> *Implementation Notes:*
>>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` instead 
>>>>>> of
>>>>>> the usual `planInputPartitions()`. It returns instances of 
>>>>>> `ColumnarBatch`
>>>>>> on each iteration.
>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This
>>>>>> was copied from [3] . Added by @Daniel Weeks  .
>>>>>> Thanks for that!
>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used
>>>>>> for reading/decoding the Parquet rowgroups (aka pagestores as referred to
>>>>>> in the code)
>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>> properly. This has the added benefit of vectorization support for 
>>>>>> structs,
>>>>>> so yay!
>>>>>>  - For the initial version the value readers read an entire row group
>>>>>> into a single Arrow Field Vector. this i'd imagine will require tuning 
>>>>>> for
>>>>>> right batch sizing but i'v gone with one batch per ro

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-26 Thread Gautam
This fails on master too btw. Just wondering if i'm doing something wrong
trying to run this.

On Fri, Jul 26, 2019 at 2:24 PM Gautam  wrote:

> I'v been trying to run the jmh benchmarks bundled within the project. I'v
> been running into issues with that .. have other hit this? Am I running
> these incorrectly?
>
>
> bash-3.2$ ./gradlew :iceberg-spark:jmh
> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
> ..
> ...
> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
> error: plug-in not found: ErrorProne
>
> FAILURE: Build failed with an exception.
>
>
>
> Is there a config/plugin I need to add to build.gradle?
>
>
>
>
>
>
>
>
> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue  wrote:
>
>> Thanks Gautam!
>>
>> We'll start taking a look at your code. What do you think about creating
>> a branch in the Iceberg repository where we can work on improving it
>> together, before merging it into master?
>>
>> Also, you mentioned performance comparisons. Do you have any early
>> results to share?
>>
>> rb
>>
>> On Tue, Jul 23, 2019 at 3:40 PM Gautam  wrote:
>>
>>> Hello Folks,
>>>
>>> I have checked in a WIP branch [1] with a working version of Vectorized
>>> reads for Iceberg reader. Here's the diff  [2].
>>>
>>> *Implementation Notes:*
>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>>> each iteration.
>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>>> copied from [3] . Added by @Daniel Weeks  . Thanks
>>> for that!
>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>>> the code)
>>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>>> to map Parquet types to appropriate value readers. I implemented the struct
>>> visitor so that the root schema can be mapped properly. This has the added
>>> benefit of vectorization support for structs, so yay!
>>>  - For the initial version the value readers read an entire row group
>>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>>> Spark's ColumnVector implementation backed by Arrow. This is the first
>>> contact point between Spark and Arrow interfaces.
>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>> which maps Structs to Columnar Batches. This allows us to have nested
>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>> know what you think of this approach.
>>>  - I'v added value readers for all supported primitive types listed in
>>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>>> `TestSparkParquetVectorizedReader`
>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>>
>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>>> around to compare performance.
>>>
>>> Lemme know what folks think of the approach. I'm getting this working
>>> for our scale test benchmark and will report back with numbers. Feel free
>>> to run your own benchmarks and share.
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>> [2] -
>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>> [3] -
>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>
>>>
>>> On Mon, Jul 22, 2019 at 2:33 PM Gaut

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-26 Thread Gautam
I'v been trying to run the jmh benchmarks bundled within the project. I'v
been running into issues with that .. have other hit this? Am I running
these incorrectly?


bash-3.2$ ./gradlew :iceberg-spark:jmh
-PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
..
...
> Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
error: plug-in not found: ErrorProne

FAILURE: Build failed with an exception.



Is there a config/plugin I need to add to build.gradle?








On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue  wrote:

> Thanks Gautam!
>
> We'll start taking a look at your code. What do you think about creating a
> branch in the Iceberg repository where we can work on improving it
> together, before merging it into master?
>
> Also, you mentioned performance comparisons. Do you have any early results
> to share?
>
> rb
>
> On Tue, Jul 23, 2019 at 3:40 PM Gautam  wrote:
>
>> Hello Folks,
>>
>> I have checked in a WIP branch [1] with a working version of Vectorized
>> reads for Iceberg reader. Here's the diff  [2].
>>
>> *Implementation Notes:*
>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>> each iteration.
>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>> copied from [3] . Added by @Daniel Weeks  . Thanks
>> for that!
>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>> the code)
>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>> to map Parquet types to appropriate value readers. I implemented the struct
>> visitor so that the root schema can be mapped properly. This has the added
>> benefit of vectorization support for structs, so yay!
>>  - For the initial version the value readers read an entire row group
>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>> Spark's ColumnVector implementation backed by Arrow. This is the first
>> contact point between Spark and Arrow interfaces.
>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>> which maps Structs to Columnar Batches. This allows us to have nested
>> structs where each level of nesting would be a nested columnar batch. Lemme
>> know what you think of this approach.
>>  - I'v added value readers for all supported primitive types listed in
>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>> `TestSparkParquetVectorizedReader`
>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>
>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>> around to compare performance.
>>
>> Lemme know what folks think of the approach. I'm getting this working for
>> our scale test benchmark and will report back with numbers. Feel free to
>> run your own benchmarks and share.
>>
>> Cheers,
>> -Gautam.
>>
>>
>>
>>
>> [1] -
>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>> [2] -
>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>> [3] -
>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>
>>
>> On Mon, Jul 22, 2019 at 2:33 PM Gautam  wrote:
>>
>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>> primitive type support.
>>>
>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:
>>>
>>>> Would it be possible to put the work in progress code in open source?
>>>>
>>>>
>>>>
>>>> *From: *Gautam 
>>>> *Reply-To: *"dev@iceberg.apache.org" 
>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>> *To: *Daniel Weeks 
>&g

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-24 Thread Gautam
+1 on having a branch. Lemme know once you do i'l rebase and open a PR
against it.

Will get back to you on perf numbers soon.

On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue  wrote:

> Thanks Gautam!
>
> We'll start taking a look at your code. What do you think about creating a
> branch in the Iceberg repository where we can work on improving it
> together, before merging it into master?
>
> Also, you mentioned performance comparisons. Do you have any early results
> to share?
>
> rb
>
> On Tue, Jul 23, 2019 at 3:40 PM Gautam  wrote:
>
>> Hello Folks,
>>
>> I have checked in a WIP branch [1] with a working version of Vectorized
>> reads for Iceberg reader. Here's the diff  [2].
>>
>> *Implementation Notes:*
>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>> each iteration.
>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>> copied from [3] . Added by @Daniel Weeks  . Thanks
>> for that!
>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>> the code)
>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>> to map Parquet types to appropriate value readers. I implemented the struct
>> visitor so that the root schema can be mapped properly. This has the added
>> benefit of vectorization support for structs, so yay!
>>  - For the initial version the value readers read an entire row group
>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>> Spark's ColumnVector implementation backed by Arrow. This is the first
>> contact point between Spark and Arrow interfaces.
>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>> which maps Structs to Columnar Batches. This allows us to have nested
>> structs where each level of nesting would be a nested columnar batch. Lemme
>> know what you think of this approach.
>>  - I'v added value readers for all supported primitive types listed in
>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>> `TestSparkParquetVectorizedReader`
>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>
>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>> around to compare performance.
>>
>> Lemme know what folks think of the approach. I'm getting this working for
>> our scale test benchmark and will report back with numbers. Feel free to
>> run your own benchmarks and share.
>>
>> Cheers,
>> -Gautam.
>>
>>
>>
>>
>> [1] -
>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>> [2] -
>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>> [3] -
>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>
>>
>> On Mon, Jul 22, 2019 at 2:33 PM Gautam  wrote:
>>
>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>> primitive type support.
>>>
>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:
>>>
>>>> Would it be possible to put the work in progress code in open source?
>>>>
>>>>
>>>>
>>>> *From: *Gautam 
>>>> *Reply-To: *"dev@iceberg.apache.org" 
>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>> *To: *Daniel Weeks 
>>>> *Cc: *Ryan Blue , Iceberg Dev List <
>>>> dev@iceberg.apache.org>
>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>
>>>>
>>>>
>>>> That would be great!
>>>>
>>>>
>>>>
>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks 
>>>> wrote:
>>>>
>>>> H

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-23 Thread Gautam
Hello Folks,

I have checked in a WIP branch [1] with a working version of Vectorized
reads for Iceberg reader. Here's the diff  [2].

*Implementation Notes:*
 - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
each iteration.
 - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
copied from [3] . Added by @Daniel Weeks  . Thanks for
that!
 - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
reading/decoding the Parquet rowgroups (aka pagestores as referred to in
the code)
 - `VectorizedSparkParquetReaders` contains the visitor implementations to
map Parquet types to appropriate value readers. I implemented the struct
visitor so that the root schema can be mapped properly. This has the added
benefit of vectorization support for structs, so yay!
 - For the initial version the value readers read an entire row group into
a single Arrow Field Vector. this i'd imagine will require tuning for right
batch sizing but i'v gone with one batch per rowgroup for now.
 - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
Spark's ColumnVector implementation backed by Arrow. This is the first
contact point between Spark and Arrow interfaces.
 - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
`ColumnarBatchReader` . This is my replacement for `InternalRowReader`
which maps Structs to Columnar Batches. This allows us to have nested
structs where each level of nesting would be a nested columnar batch. Lemme
know what you think of this approach.
 - I'v added value readers for all supported primitive types listed in
`AvroDataTest`. There's a corresponding test for vectorized reader under
`TestSparkParquetVectorizedReader`
 - I haven't fixed all the Checkstyle errors so you will have to turn
checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(

*P.S*. There's some unused code under ArrowReader.java. Ignore this as it's
not used. This was from my previous impl of Vectorization. I'v kept it
around to compare performance.

Lemme know what folks think of the approach. I'm getting this working for
our scale test benchmark and will report back with numbers. Feel free to
run your own benchmarks and share.

Cheers,
-Gautam.




[1] -
https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
[2] -
https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
[3] -
https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java


On Mon, Jul 22, 2019 at 2:33 PM Gautam  wrote:

> Will do. Doing a bit of housekeeping on the code and also adding more
> primitive type support.
>
> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:
>
>> Would it be possible to put the work in progress code in open source?
>>
>>
>>
>> *From: *Gautam 
>> *Reply-To: *"dev@iceberg.apache.org" 
>> *Date: *Monday, July 22, 2019 at 9:46 AM
>> *To: *Daniel Weeks 
>> *Cc: *Ryan Blue , Iceberg Dev List <
>> dev@iceberg.apache.org>
>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>
>>
>>
>> That would be great!
>>
>>
>>
>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks  wrote:
>>
>> Hey Gautam,
>>
>>
>>
>> We also have a couple people looking into vectorized reading (into Arrow
>> memory).  I think it would be good for us to get together and see if we can
>> collaborate on a common approach for this.
>>
>>
>>
>> I'll reach out directly and see if we can get together.
>>
>>
>>
>> -Dan
>>
>>
>>
>> On Sun, Jul 21, 2019 at 10:35 PM Gautam  wrote:
>>
>> Figured this out. I'm returning ColumnarBatch iterator directly without
>> projection with schema set appropriately in `readSchema() `.. the empty
>> result was due to valuesRead not being set correctly on FileIterator. Did
>> that and things are working. Will circle back with numbers soon.
>>
>>
>>
>> On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:
>>
>> Hey Guys,
>>
>>Sorry bout the delay on this. Just got back on getting a basic
>> working implementation in Iceberg for Vectorization on primitive types.
>>
>>
>>
>> *Here's what I have so far :  *
>>
>>
>>
>> I have added `ParquetValueReader` implementations for some basic
>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-22 Thread Gautam
Will do. Doing a bit of housekeeping on the code and also adding more
primitive type support.

On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:

> Would it be possible to put the work in progress code in open source?
>
>
>
> *From: *Gautam 
> *Reply-To: *"dev@iceberg.apache.org" 
> *Date: *Monday, July 22, 2019 at 9:46 AM
> *To: *Daniel Weeks 
> *Cc: *Ryan Blue , Iceberg Dev List <
> dev@iceberg.apache.org>
> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>
>
>
> That would be great!
>
>
>
> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks  wrote:
>
> Hey Gautam,
>
>
>
> We also have a couple people looking into vectorized reading (into Arrow
> memory).  I think it would be good for us to get together and see if we can
> collaborate on a common approach for this.
>
>
>
> I'll reach out directly and see if we can get together.
>
>
>
> -Dan
>
>
>
> On Sun, Jul 21, 2019 at 10:35 PM Gautam  wrote:
>
> Figured this out. I'm returning ColumnarBatch iterator directly without
> projection with schema set appropriately in `readSchema() `.. the empty
> result was due to valuesRead not being set correctly on FileIterator. Did
> that and things are working. Will circle back with numbers soon.
>
>
>
> On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:
>
> Hey Guys,
>
>Sorry bout the delay on this. Just got back on getting a basic
> working implementation in Iceberg for Vectorization on primitive types.
>
>
>
> *Here's what I have so far :  *
>
>
>
> I have added `ParquetValueReader` implementations for some basic primitive
> types that build the respective Arrow Vector (`ValueVector`) viz.
> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
> value vector reader there are column iterators that read from the parquet
> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
> stitched together using a `ColumnarBatchReader` (which as the name suggests
> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
> work properly with the underlying interfaces.  I'v also made changes to
> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
> expects ColumnarBatch instances (instead of InternalRow). The query
> planning runtime works fine with these changes.
>
>
>
> Although it fails during query execution, the bit it's  currently failing
> at is this line of code : 
> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
> [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>
>
>
> This code, I think,  tries to apply the iterator's schema projection on
> the InternalRow instances. This seems to be tightly coupled to InternalRow
> as Spark's catalyst expressions have implemented the UnsafeProjection for
> InternalRow only. If I take this out and just return the
> `Iterator` iterator I built it returns empty result on the
> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
> There's a Todo in the code that says "*remove the projection by reporting
> the iterator's schema back to Spark*".  Is there a simple way to
> communicate that to Spark for my new iterator? Any pointers on how to get
> around this?
>
>
>
>
>
> Thanks and Regards,
>
> -Gautam.
>
>
>
>
>
>
>
>
>
> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:
>
> Replies inline.
>
>
>
> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>
> Thanks for responding Ryan,
>
>
>
> Couple of follow up questions on ParquetValueReader for Arrow..
>
>
>
> I'd like to start with testing Arrow out with readers for primitive type
> and incrementally add in Struct/Array support, also ArrowWriter [1]
> currently doesn't have converters for map type. How can I default these
> types to regular materialization whilst supporting Arrow based support for
> primitives?
>
>
>
> We should look at what Spark does to handle maps.
>
>
>
> I think we should get the prototype working with test ca

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-22 Thread Gautam
That would be great!

On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks  wrote:

> Hey Gautam,
>
> We also have a couple people looking into vectorized reading (into Arrow
> memory).  I think it would be good for us to get together and see if we can
> collaborate on a common approach for this.
>
> I'll reach out directly and see if we can get together.
>
> -Dan
>
> On Sun, Jul 21, 2019 at 10:35 PM Gautam  wrote:
>
>> Figured this out. I'm returning ColumnarBatch iterator directly without
>> projection with schema set appropriately in `readSchema() `.. the empty
>> result was due to valuesRead not being set correctly on FileIterator. Did
>> that and things are working. Will circle back with numbers soon.
>>
>> On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:
>>
>>> Hey Guys,
>>>Sorry bout the delay on this. Just got back on getting a
>>> basic working implementation in Iceberg for Vectorization on primitive
>>> types.
>>>
>>> *Here's what I have so far :  *
>>>
>>> I have added `ParquetValueReader` implementations for some basic
>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>> value vector reader there are column iterators that read from the parquet
>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>> work properly with the underlying interfaces.  I'v also made changes to
>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>> planning runtime works fine with these changes.
>>>
>>> Although it fails during query execution, the bit it's  currently
>>> failing at is this line of code :
>>> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>
>>> This code, I think,  tries to apply the iterator's schema projection on
>>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>>> InternalRow only. If I take this out and just return the
>>> `Iterator` iterator I built it returns empty result on the
>>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>>> There's a Todo in the code that says "*remove the projection by
>>> reporting the iterator's schema back to Spark*".  Is there a simple way
>>> to communicate that to Spark for my new iterator? Any pointers on how to
>>> get around this?
>>>
>>>
>>> Thanks and Regards,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:
>>>
>>>> Replies inline.
>>>>
>>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>>>>
>>>>> Thanks for responding Ryan,
>>>>>
>>>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>>>
>>>>> I'd like to start with testing Arrow out with readers for primitive
>>>>> type and incrementally add in Struct/Array support, also ArrowWriter [1]
>>>>> currently doesn't have converters for map type. How can I default these
>>>>> types to regular materialization whilst supporting Arrow based support for
>>>>> primitives?
>>>>>
>>>>
>>>> We should look at what Spark does to handle maps.
>>>>
>>>> I think we should get the prototype working with test cases that don't
>>>> have maps, structs, or lists. Just getting primitives working is a good
>>>> start and just won't hit these problems.
>>>>
>>>>
>>>>> Lemme know if this makes sense...
>>>>>
>>>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types
>>>>> into ArrowColumnVectors of corresponding column types by iterating over
>>>>> underlying ColumnIterator *n times*, where n is size

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-21 Thread Gautam
Figured this out. I'm returning ColumnarBatch iterator directly without
projection with schema set appropriately in `readSchema() `.. the empty
result was due to valuesRead not being set correctly on FileIterator. Did
that and things are working. Will circle back with numbers soon.

On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:

> Hey Guys,
>Sorry bout the delay on this. Just got back on getting a basic
> working implementation in Iceberg for Vectorization on primitive types.
>
> *Here's what I have so far :  *
>
> I have added `ParquetValueReader` implementations for some basic primitive
> types that build the respective Arrow Vector (`ValueVector`) viz.
> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
> value vector reader there are column iterators that read from the parquet
> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
> stitched together using a `ColumnarBatchReader` (which as the name suggests
> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
> work properly with the underlying interfaces.  I'v also made changes to
> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
> expects ColumnarBatch instances (instead of InternalRow). The query
> planning runtime works fine with these changes.
>
> Although it fails during query execution, the bit it's  currently failing
> at is this line of code :
> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>
> This code, I think,  tries to apply the iterator's schema projection on
> the InternalRow instances. This seems to be tightly coupled to InternalRow
> as Spark's catalyst expressions have implemented the UnsafeProjection for
> InternalRow only. If I take this out and just return the
> `Iterator` iterator I built it returns empty result on the
> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
> There's a Todo in the code that says "*remove the projection by reporting
> the iterator's schema back to Spark*".  Is there a simple way to
> communicate that to Spark for my new iterator? Any pointers on how to get
> around this?
>
>
> Thanks and Regards,
> -Gautam.
>
>
>
>
> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:
>
>> Replies inline.
>>
>> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>>
>>> Thanks for responding Ryan,
>>>
>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>
>>> I'd like to start with testing Arrow out with readers for primitive type
>>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>>> currently doesn't have converters for map type. How can I default these
>>> types to regular materialization whilst supporting Arrow based support for
>>> primitives?
>>>
>>
>> We should look at what Spark does to handle maps.
>>
>> I think we should get the prototype working with test cases that don't
>> have maps, structs, or lists. Just getting primitives working is a good
>> start and just won't hit these problems.
>>
>>
>>> Lemme know if this makes sense...
>>>
>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>> ArrowColumnVectors of corresponding column types by iterating over
>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>
>>
>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>> not too familiar with the Arrow APIs.
>>
>>
>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>> current *InternalRow* based ValueReaders
>>>
>>
>> Sounds good for primitives, but I would just leave the nested types
>> un-implemented for now.
>>
>>
>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although*
>>>  *I'm a bit lost on how the stitching of columns happens currently*? ..
>>> and how the ArrowColumnVectors could  be stitched alongside regular columns
>>> that don't have arrow based support ?
>>>
>>
>> I don't think that you can mix regular columns and Arrow columns. It has
>> to be 

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-19 Thread Gautam
Hey Guys,
   Sorry bout the delay on this. Just got back on getting a basic
working implementation in Iceberg for Vectorization on primitive types.

*Here's what I have so far :  *

I have added `ParquetValueReader` implementations for some basic primitive
types that build the respective Arrow Vector (`ValueVector`) viz.
`IntVector` for int, `VarCharVector` for strings and so on. Underneath each
value vector reader there are column iterators that read from the parquet
pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
`ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
stitched together using a `ColumnarBatchReader` (which as the name suggests
wraps ColumnarBatches in the iterator)   I'v verified that these pieces
work properly with the underlying interfaces.  I'v also made changes to
Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
`SupportsScanColumnarBatch` mixin to the reader).  So the reader now
expects ColumnarBatch instances (instead of InternalRow). The query
planning runtime works fine with these changes.

Although it fails during query execution, the bit it's  currently failing
at is this line of code :
https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414

This code, I think,  tries to apply the iterator's schema projection on the
InternalRow instances. This seems to be tightly coupled to InternalRow as
Spark's catalyst expressions have implemented the UnsafeProjection for
InternalRow only. If I take this out and just return the
`Iterator` iterator I built it returns empty result on the
client. I'm guessing this is coz Spark is unaware of the iterator's schema?
There's a Todo in the code that says "*remove the projection by reporting
the iterator's schema back to Spark*".  Is there a simple way to
communicate that to Spark for my new iterator? Any pointers on how to get
around this?


Thanks and Regards,
-Gautam.




On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:

> Replies inline.
>
> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>
>> Thanks for responding Ryan,
>>
>> Couple of follow up questions on ParquetValueReader for Arrow..
>>
>> I'd like to start with testing Arrow out with readers for primitive type
>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>> currently doesn't have converters for map type. How can I default these
>> types to regular materialization whilst supporting Arrow based support for
>> primitives?
>>
>
> We should look at what Spark does to handle maps.
>
> I think we should get the prototype working with test cases that don't
> have maps, structs, or lists. Just getting primitives working is a good
> start and just won't hit these problems.
>
>
>> Lemme know if this makes sense...
>>
>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>> ArrowColumnVectors of corresponding column types by iterating over
>> underlying ColumnIterator *n times*, where n is size of batch.
>>
>
> Sounds good to me. I'm not sure about extending vs wrapping because I'm
> not too familiar with the Arrow APIs.
>
>
>> - Reader.newParquetIterable()  maps primitive column types to the newly
>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>> current *InternalRow* based ValueReaders
>>
>
> Sounds good for primitives, but I would just leave the nested types
> un-implemented for now.
>
>
>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
>> a bit lost on how the stitching of columns happens currently*? .. and
>> how the ArrowColumnVectors could  be stitched alongside regular columns
>> that don't have arrow based support ?
>>
>
> I don't think that you can mix regular columns and Arrow columns. It has
> to be all one or the other. That's why it's easier to start with
> primitives, then add structs, then lists, and finally maps.
>
>
>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
>> DataSourceV2ScanExec starts using ColumnarBatch scans
>>
>
> We will probably need two paths. One for columnar batches and one for
> row-based reads. That doesn't need to be done right away and what you
> already have in your working copy makes sense as a start.
>
>
>> That's a lot of questions! :-) but hope i'm making sense.
>>
>> -Gautam.
>>
>>
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Approaching Vectorized Reading in Iceberg ..

2019-06-14 Thread Gautam
Agree with the approach of getting this working for primitive types only.
I'l work on a prototype assuming just primitive types for now.

I don't think that you can mix regular columns and Arrow columns. It has to
> be all one or the other.


I was jsut curious about this coz Vanilla Spark reader (with vectorization)
doesn't support batching on nested fields today but it's still able to do
vectorization on data with nested/non-nested. This is not needed for my poc
but would be good to know so if we can leverage this for our
implementation. Either ways, i'l get to it when this step is done.


On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:

> Replies inline.
>
> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>
>> Thanks for responding Ryan,
>>
>> Couple of follow up questions on ParquetValueReader for Arrow..
>>
>> I'd like to start with testing Arrow out with readers for primitive type
>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>> currently doesn't have converters for map type. How can I default these
>> types to regular materialization whilst supporting Arrow based support for
>> primitives?
>>
>
> We should look at what Spark does to handle maps.
>
> I think we should get the prototype working with test cases that don't
> have maps, structs, or lists. Just getting primitives working is a good
> start and just won't hit these problems.
>
>
>> Lemme know if this makes sense...
>>
>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>> ArrowColumnVectors of corresponding column types by iterating over
>> underlying ColumnIterator *n times*, where n is size of batch.
>>
>
> Sounds good to me. I'm not sure about extending vs wrapping because I'm
> not too familiar with the Arrow APIs.
>
>
>> - Reader.newParquetIterable()  maps primitive column types to the newly
>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>> current *InternalRow* based ValueReaders
>>
>
> Sounds good for primitives, but I would just leave the nested types
> un-implemented for now.
>
>
>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
>> a bit lost on how the stitching of columns happens currently*? .. and
>> how the ArrowColumnVectors could  be stitched alongside regular columns
>> that don't have arrow based support ?
>>
>
> I don't think that you can mix regular columns and Arrow columns. It has
> to be all one or the other. That's why it's easier to start with
> primitives, then add structs, then lists, and finally maps.
>
>
>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
>> DataSourceV2ScanExec starts using ColumnarBatch scans
>>
>
> We will probably need two paths. One for columnar batches and one for
> row-based reads. That doesn't need to be done right away and what you
> already have in your working copy makes sense as a start.
>
>
>> That's a lot of questions! :-) but hope i'm making sense.
>>
>> -Gautam.
>>
>>
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Approaching Vectorized Reading in Iceberg ..

2019-06-14 Thread Gautam
Thanks for responding Ryan,

Couple of follow up questions on ParquetValueReader for Arrow..

I'd like to start with testing Arrow out with readers for primitive type
and incrementally add in Struct/Array support, also ArrowWriter [1]
currently doesn't have converters for map type. How can I default these
types to regular materialization whilst supporting Arrow based support for
primitives?

Lemme know if this makes sense...

- I extend  PrimitiveReader (for Arrow) that loads primitive types into
ArrowColumnVectors of corresponding column types by iterating over
underlying ColumnIterator *n times*, where n is size of batch.
- Reader.newParquetIterable()  maps primitive column types to the newly
added ArrowParquetValueReader but for other types (nested types, etc.) uses
current *InternalRow* based ValueReaders
- Stitch the columns vectors together to create ColumnarBatch, (Since
*SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
a bit lost on how the stitching of columns happens currently*? .. and how
the ArrowColumnVectors could  be stitched alongside regular columns that
don't have arrow based support ?
- Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
DataSourceV2ScanExec starts using ColumnarBatch scans


That's a lot of questions! :-) but hope i'm making sense.

-Gautam.



[1] -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala

On Thu, Jun 13, 2019 at 9:39 AM Ryan Blue  wrote:

> Sounds like a good start. I think the next step is to avoid using the
> ParquetReader.FileIterator and deserialize directly from TripleIterator
> <https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java>.
> I think the reason why this is taking longer is that (I think) you’re doing
> all the work to materialize the data in rows, and then converting to
> vectors.
>
> To work on top of TripleIterator, I think you need to create a
> ParquetValueReader for Arrow batches. That would be configured with a
> batch size, so that when you pass it into ParquetReader, the FileIterator
> returns batches instead of individual rows.
>
> Does that make sense?
>
> rb
>
> On Wed, Jun 12, 2019 at 11:22 PM Gautam  wrote:
>
>> Hey Ryan and Anton,
>>
>> I wanted to circle back on some findings I had after taking a first stab
>> at this ..
>>
>>
>>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s
>>> what we want to take advantage of..
>>
>>
>> This is how I went about this... I wrapped the
>> *ParquetReader.FileIterator* with an iterator that creates Arrow batch
>> from rows (one batch for every 100 rows) returned by *FileIterator.next()
>> *. I exposed each Arrow Batch from this iterator as a ColumnarBatch
>> which has a * .rowIterator() *that reads this as a sequence of
>> InternalRow. I return this in the  *Reader.open() *call in Iceberg.
>>
>> Here are some microbenchmark numbers on on flat parquet file scanning ...
>>
>> 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10
>> files
>>
>> *BenchmarkMode  Cnt   Score(sec)   Error Units*
>> readFileV1SourceVectorized   avgt3   0.870± 1.883   s/op
>> readIcebergNoBatchingavgt3   1.854± 2.003   s/op
>> readIcebergWithBatching100k  avgt3   3.216± 0.520   s/op
>> readIcebergWithBatching10k   avgt3   8.763± 2.338   s/op
>> readIcebergWithBatching5kavgt3  13.964± 6.328   s/op
>>
>>
>> The Batching doesn't seem to add any benefit. I measured the conversion
>> times and am reading this as the overhead from extra copies to Arrow and
>> then to ColumnarBatch again. Although I was hoping that the materialization
>> to arrow would offset some of that overhead.
>>
>> Wondering what my next step should be..
>> 1) Eliminate the extra conversion IO overhead by reading each column type
>> directly into ArrowColumnVector?
>> 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch
>> mixin and expose the ColumnarBatch?
>>
>>
>> Appreciate your guidance,
>>
>> -Gautam.
>>
>>
>>
>> On Fri, May 24, 2019 at 5:28 PM Ryan Blue 
>> wrote:
>>
>>> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>>> Iterator[InternalRow] interface, it would still not work right? Coz it
>>> seems to me there is a lot more going on upstream in the operator executi

Re: Approaching Vectorized Reading in Iceberg ..

2019-06-14 Thread Gautam
Hey Anton,
  Here's the code
https://github.com/prodeezy/incubator-iceberg/pull/2/files .. Mind you,
it's just a proof of concept to get something going so please ignore the
code design (or lack thereof :-) ).  I'v attached the flat data benchmark
code as well.

Lemme know what you think.







On Thu, Jun 13, 2019 at 10:56 PM Anton Okolnychyi 
wrote:

> Gautam, could you also share the code for benchmarks and conversion?
>
> Thanks,
> Anton
>
> On 13 Jun 2019, at 19:38, Ryan Blue  wrote:
>
> Sounds like a good start. I think the next step is to avoid using the
> ParquetReader.FileIterator and deserialize directly from TripleIterator
> <https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java>.
> I think the reason why this is taking longer is that (I think) you’re doing
> all the work to materialize the data in rows, and then converting to
> vectors.
>
> To work on top of TripleIterator, I think you need to create a
> ParquetValueReader for Arrow batches. That would be configured with a
> batch size, so that when you pass it into ParquetReader, the FileIterator
> returns batches instead of individual rows.
>
> Does that make sense?
>
> rb
>
> On Wed, Jun 12, 2019 at 11:22 PM Gautam  wrote:
>
>> Hey Ryan and Anton,
>>
>> I wanted to circle back on some findings I had after taking a first stab
>> at this ..
>>
>>
>>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s
>>> what we want to take advantage of..
>>
>>
>> This is how I went about this... I wrapped the
>> *ParquetReader.FileIterator* with an iterator that creates Arrow batch
>> from rows (one batch for every 100 rows) returned by *FileIterator.next()
>> *. I exposed each Arrow Batch from this iterator as a ColumnarBatch
>> which has a * .rowIterator() *that reads this as a sequence of
>> InternalRow. I return this in the  *Reader.open() *call in Iceberg.
>>
>> Here are some microbenchmark numbers on on flat parquet file scanning ...
>>
>> 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10
>> files
>>
>> *BenchmarkMode  Cnt   Score(sec)   Error Units*
>> readFileV1SourceVectorized   avgt3   0.870± 1.883   s/op
>> readIcebergNoBatchingavgt3   1.854± 2.003   s/op
>> readIcebergWithBatching100k  avgt3   3.216± 0.520   s/op
>> readIcebergWithBatching10k   avgt3   8.763± 2.338   s/op
>> readIcebergWithBatching5kavgt3  13.964± 6.328   s/op
>>
>>
>> The Batching doesn't seem to add any benefit. I measured the conversion
>> times and am reading this as the overhead from extra copies to Arrow and
>> then to ColumnarBatch again. Although I was hoping that the materialization
>> to arrow would offset some of that overhead.
>>
>> Wondering what my next step should be..
>> 1) Eliminate the extra conversion IO overhead by reading each column type
>> directly into ArrowColumnVector?
>> 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch
>> mixin and expose the ColumnarBatch?
>>
>>
>> Appreciate your guidance,
>>
>> -Gautam.
>>
>>
>>
>> On Fri, May 24, 2019 at 5:28 PM Ryan Blue 
>> wrote:
>>
>>> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>>> Iterator[InternalRow] interface, it would still not work right? Coz it
>>> seems to me there is a lot more going on upstream in the operator execution
>>> path that would be needed to be done here.
>>>
>>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
>>> we want to take advantage of. You’re right that the first thing that Spark
>>> does it to get each row as InternalRow. But we still get a benefit from
>>> vectorizing the data materialization to Arrow itself. Spark execution is
>>> not vectorized, but that can be updated in Spark later (I think there’s a
>>> proposal).
>>>
>>> I wouldn’t pay too much attention to the Parquet vectorized path in
>>> Spark because it produces its own in-memory format and not Arrow. We want
>>> to take advantage of Arrow so that we can use dictionary-encoded columns in
>>> record batches. Spark’s vectorized Parquet reader also works directly on
>>> Parquet pages instead of a higher-level abst

Re: Approaching Vectorized Reading in Iceberg ..

2019-06-12 Thread Gautam
Hey Ryan and Anton,

I wanted to circle back on some findings I had after taking a first stab at
this ..


> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s
> what we want to take advantage of..


This is how I went about this... I wrapped the *ParquetReader.FileIterator*
with an iterator that creates Arrow batch from rows (one batch for every
100 rows) returned by *FileIterator.next() *. I exposed each Arrow Batch
from this iterator as a ColumnarBatch which has a * .rowIterator() *that
reads this as a sequence of InternalRow. I return this in the  *Reader.open()
*call in Iceberg.

Here are some microbenchmark numbers on on flat parquet file scanning ...

1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10 files

*BenchmarkMode  Cnt   Score(sec)   Error Units*
readFileV1SourceVectorized   avgt3   0.870± 1.883   s/op
readIcebergNoBatchingavgt3   1.854± 2.003   s/op
readIcebergWithBatching100k  avgt3   3.216± 0.520   s/op
readIcebergWithBatching10k   avgt3   8.763± 2.338   s/op
readIcebergWithBatching5kavgt3  13.964± 6.328   s/op


The Batching doesn't seem to add any benefit. I measured the conversion
times and am reading this as the overhead from extra copies to Arrow and
then to ColumnarBatch again. Although I was hoping that the materialization
to arrow would offset some of that overhead.

Wondering what my next step should be..
1) Eliminate the extra conversion IO overhead by reading each column type
directly into ArrowColumnVector?
2) Should I extend IcebergSource to support the SupportsScanColumnarBatch
mixin and expose the ColumnarBatch?


Appreciate your guidance,

-Gautam.



On Fri, May 24, 2019 at 5:28 PM Ryan Blue  wrote:

> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
> Iterator[InternalRow] interface, it would still not work right? Coz it
> seems to me there is a lot more going on upstream in the operator execution
> path that would be needed to be done here.
>
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
> we want to take advantage of. You’re right that the first thing that Spark
> does it to get each row as InternalRow. But we still get a benefit from
> vectorizing the data materialization to Arrow itself. Spark execution is
> not vectorized, but that can be updated in Spark later (I think there’s a
> proposal).
>
> I wouldn’t pay too much attention to the Parquet vectorized path in Spark
> because it produces its own in-memory format and not Arrow. We want to take
> advantage of Arrow so that we can use dictionary-encoded columns in record
> batches. Spark’s vectorized Parquet reader also works directly on Parquet
> pages instead of a higher-level abstraction. I’m not sure we are going to
> want to do that right away instead of using the TripleIterator that Iceberg
> currently uses to abstract the Parquet page structure away.
>
> I would start by building converters that can build a column of Arrow data
> from a TripleIterator. Then we can stitch those columns together to get
> record batches and see how that performs. Then we can add complexity from
> there.
>
> On Fri, May 24, 2019 at 4:28 PM Gautam  wrote:
>
>> Hello devs,
>>As a follow up to
>> https://github.com/apache/incubator-iceberg/issues/9 I'v been reading
>> through how Spark does vectorized reading in it's current implementation
>> which is in DataSource V1 path. Trying to see how we can achieve the same
>> impact in Iceberg's reading. To start with I want to form an understanding
>> at a high level of the approach one would need to take to achieve this.
>> Pardon my ignorance as I'm equally new to Spark codebase as I am to
>> Iceberg. Please correct me if my understanding is wrong.
>>
>> So here's what Vectorization seems to be doing for Parquet reading:
>> - The DataSource scan execution uses ParquetFileFormat to build a
>> RecordReaderIterator [1] which underneath uses the
>> VectorizedParquetReaderReader.
>> - This record reader is used to iterate over entire batches of columns
>> (ColumnarBatch). The iterator.next() call returns a batch and not just a
>> row. The interfaces are such that allow an ColumnarBatch to be passed
>> around as a generic Object. As stated here [2]
>> - On the scan execution side, there is stage Code Generation that
>> compiles code that consumes entire batches at time so that physical
>> operators take advantage of the vectorization feature. So the scanner code
>> is aware that it's reading columnar b

Re: Approaching Vectorized Reading in Iceberg ..

2019-05-24 Thread Gautam
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
we want to take advantage of. You’re right that the first thing that Spark
does it to get each row as InternalRow. But we still get a benefit from
vectorizing the data materialization to Arrow itself. Spark execution is
not vectorized, but that can be updated in Spark later (I think there’s a
proposal).

Got it. So to clarify.. for starters we may not get the *entire* benefit
from vectorization (coz spark's operator execution still needs to take
advantage of Arrow upstream), but by reading Parquet into Arrow and
streaming it over Iterator as a stream we benefit from the
columnar materialization.

>  I’m not sure we are going to want to do that right away instead of using
the TripleIterator that Iceberg currently uses to abstract the Parquet page
structure away.

I agree overall with the Arrow approach, was looking at Spark's
vectorization just to understand how the speed up happens and what the
contracts should be in the Iceberg flow when we do that using Arrow. My
main concern was, even if I did read this into Arrow, if the transfer is
row-by-row we might be losing the whole point of using an in-memory
columnar format.  But as you said in the first point that we still get a
lot of the benefit with just organizing columns in memory this way.






On Fri, May 24, 2019 at 5:28 PM Ryan Blue  wrote:

> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
> Iterator[InternalRow] interface, it would still not work right? Coz it
> seems to me there is a lot more going on upstream in the operator execution
> path that would be needed to be done here.
>
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
> we want to take advantage of. You’re right that the first thing that Spark
> does it to get each row as InternalRow. But we still get a benefit from
> vectorizing the data materialization to Arrow itself. Spark execution is
> not vectorized, but that can be updated in Spark later (I think there’s a
> proposal).
>
> I wouldn’t pay too much attention to the Parquet vectorized path in Spark
> because it produces its own in-memory format and not Arrow. We want to take
> advantage of Arrow so that we can use dictionary-encoded columns in record
> batches. Spark’s vectorized Parquet reader also works directly on Parquet
> pages instead of a higher-level abstraction. I’m not sure we are going to
> want to do that right away instead of using the TripleIterator that Iceberg
> currently uses to abstract the Parquet page structure away.
>
> I would start by building converters that can build a column of Arrow data
> from a TripleIterator. Then we can stitch those columns together to get
> record batches and see how that performs. Then we can add complexity from
> there.
>
> On Fri, May 24, 2019 at 4:28 PM Gautam  wrote:
>
>> Hello devs,
>>As a follow up to
>> https://github.com/apache/incubator-iceberg/issues/9 I'v been reading
>> through how Spark does vectorized reading in it's current implementation
>> which is in DataSource V1 path. Trying to see how we can achieve the same
>> impact in Iceberg's reading. To start with I want to form an understanding
>> at a high level of the approach one would need to take to achieve this.
>> Pardon my ignorance as I'm equally new to Spark codebase as I am to
>> Iceberg. Please correct me if my understanding is wrong.
>>
>> So here's what Vectorization seems to be doing for Parquet reading:
>> - The DataSource scan execution uses ParquetFileFormat to build a
>> RecordReaderIterator [1] which underneath uses the
>> VectorizedParquetReaderReader.
>> - This record reader is used to iterate over entire batches of columns
>> (ColumnarBatch). The iterator.next() call returns a batch and not just a
>> row. The interfaces are such that allow an ColumnarBatch to be passed
>> around as a generic Object. As stated here [2]
>> - On the scan execution side, there is stage Code Generation that
>> compiles code that consumes entire batches at time so that physical
>> operators take advantage of the vectorization feature. So the scanner code
>> is aware that it's reading columnar batches out of the iterator.
>>
>>
>> I'm wondering how one should approach this if one is to achieve
>> Vectorization in Iceberg Reader (DatasourceV2) path. For instance, if
>> Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>> Iterator[InternalRow] interface, it would still not work right? Coz it
>> seems to me there is a lot more going on upst

Approaching Vectorized Reading in Iceberg ..

2019-05-24 Thread Gautam
Hello devs,
   As a follow up to
https://github.com/apache/incubator-iceberg/issues/9 I'v been reading
through how Spark does vectorized reading in it's current implementation
which is in DataSource V1 path. Trying to see how we can achieve the same
impact in Iceberg's reading. To start with I want to form an understanding
at a high level of the approach one would need to take to achieve this.
Pardon my ignorance as I'm equally new to Spark codebase as I am to
Iceberg. Please correct me if my understanding is wrong.

So here's what Vectorization seems to be doing for Parquet reading:
- The DataSource scan execution uses ParquetFileFormat to build a
RecordReaderIterator [1] which underneath uses the
VectorizedParquetReaderReader.
- This record reader is used to iterate over entire batches of columns
(ColumnarBatch). The iterator.next() call returns a batch and not just a
row. The interfaces are such that allow an ColumnarBatch to be passed
around as a generic Object. As stated here [2]
- On the scan execution side, there is stage Code Generation that compiles
code that consumes entire batches at time so that physical operators take
advantage of the vectorization feature. So the scanner code is aware that
it's reading columnar batches out of the iterator.


I'm wondering how one should approach this if one is to achieve
Vectorization in Iceberg Reader (DatasourceV2) path. For instance, if
Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
Iterator[InternalRow] interface, it would still not work right? Coz it
seems to me there is a lot more going on upstream in the operator execution
path that would be needed to be done here. It would be great if folks who
are more well-versed with the Spark codebase shed some light on this. In
general, what is the contract needed between V2 DataSourceReader (like
Iceberg) and the operator execution?

thank you,
-Gautam.


[1] -
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412
[2] -
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L29


Re: Vanilla Spark Readers on Iceberg written data..

2019-05-15 Thread Gautam
RD,
  Trying to figure  if there are regressions expected between
reader and data. Bypassing metadata is easy for us coz data is in separate
directory. ETL pipeline can point the reader config to the correct
location.

On Wed, May 15, 2019 at 5:14 PM RD  wrote:

> Is backporting relevant datasource patches to Spark 2.3 a non starter? If
> this were doable I believe this is much simpler than bypassing Iceberg
> metadata to read files directly.
>
> -R
>
> On Wed, May 15, 2019 at 3:02 PM Gautam  wrote:
>
>> Just wanted to add, from what I have tested so far I see this working
>> fine with Vanilla Spark reading Iceberg data.
>>
>> On Wed, May 15, 2019 at 2:59 PM Gautam  wrote:
>>
>>> Hello There,
>>> I am currently doing some testing with Vanilla Spark
>>> Readers'  ability to read Iceberg generated data. This is both from an
>>> Iceberg/Parquet Reader interoperability and Spark version backward
>>> compatibility standpoint (e.g. Spark distributions running v2.3.x  which
>>> doesn't support Iceberg DataSource vs. those running 2.4.x) .
>>>
>>> To be clear I am talking about doing the following on data written by
>>> Iceberg :
>>>
>>> spark.read.format("parquet").load($icebergBasePath + "/data")
>>>
>>> Can I safely assume this will continue to work? If not then what could
>>> be the reasons and associated risks?
>>>
>>> This would be good to know coz these things often come up in migration
>>> path discussions and evaluating costs associated with generating and
>>> keeping two copies of the same data.
>>>
>>> thanks,
>>> - Gautam.
>>>
>>


Re: Vanilla Spark Readers on Iceberg written data..

2019-05-15 Thread Gautam
Just wanted to add, from what I have tested so far I see this working fine
with Vanilla Spark reading Iceberg data.

On Wed, May 15, 2019 at 2:59 PM Gautam  wrote:

> Hello There,
> I am currently doing some testing with Vanilla Spark
> Readers'  ability to read Iceberg generated data. This is both from an
> Iceberg/Parquet Reader interoperability and Spark version backward
> compatibility standpoint (e.g. Spark distributions running v2.3.x  which
> doesn't support Iceberg DataSource vs. those running 2.4.x) .
>
> To be clear I am talking about doing the following on data written by
> Iceberg :
>
> spark.read.format("parquet").load($icebergBasePath + "/data")
>
> Can I safely assume this will continue to work? If not then what could be
> the reasons and associated risks?
>
> This would be good to know coz these things often come up in migration
> path discussions and evaluating costs associated with generating and
> keeping two copies of the same data.
>
> thanks,
> - Gautam.
>


Vanilla Spark Readers on Iceberg written data..

2019-05-15 Thread Gautam
Hello There,
I am currently doing some testing with Vanilla Spark
Readers'  ability to read Iceberg generated data. This is both from an
Iceberg/Parquet Reader interoperability and Spark version backward
compatibility standpoint (e.g. Spark distributions running v2.3.x  which
doesn't support Iceberg DataSource vs. those running 2.4.x) .

To be clear I am talking about doing the following on data written by
Iceberg :

spark.read.format("parquet").load($icebergBasePath + "/data")

Can I safely assume this will continue to work? If not then what could be
the reasons and associated risks?

This would be good to know coz these things often come up in migration path
discussions and evaluating costs associated with generating and keeping two
copies of the same data.

thanks,
- Gautam.


Re: Reading dataset with stats making lots of network traffic..

2019-05-02 Thread Gautam
Ah looks like MergingSnapshotUpdate.mergeGroup() has the relevant logic. So
it preserves the natrual order of manifests so i guess it groups based on
when manifests were created so the answer is whatever order the commits
were done. If batches within multiple days were committed out of order then
a manifest could end up with multiple days.


On Thu, May 2, 2019 at 2:23 PM Gautam  wrote:

> Ok, thanks for the tip on not having to by tied to a hierarchical
> partition spec.
>
> Although this still doesn't explain why all the manifests are scanned,
> coz it should be pruning the list of manifests and it's not. Is my
> understanding correct that the manifest grouping might be re-shuffling up
> the days so query on 1 day might map to all manifests even? Does manifest
> merging optimize for partition boundaries or is it based on manifest's
> natural order?
>
> On Thu, May 2, 2019 at 2:06 PM Ryan Blue  wrote:
>
>> You also don't need to use year, month, and day. You can just use day.
>>
>> The time-based partition functions all produce ordinals, not local
>> values: month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day
>> and hour. In fact, I should open a PR to throw an exception when there are
>> duplicate partition functions...
>>
>> On Thu, May 2, 2019 at 1:52 PM Gautam  wrote:
>>
>>> FYI .. The test Partition Spec is  :
>>> [
>>>   YEAR: identity(21)
>>>   MONTH: identity(22)
>>>   DAY: identity(23)
>>>   batchId: identity(24)
>>> ]
>>>
>>>
>>>
>>> On Thu, May 2, 2019 at 1:46 PM Gautam  wrote:
>>>
>>>> > Using those, you should be able to control parallelism. If you want
>>>> to test with 4,000, then you can set the min count to 5,000 so Iceberg
>>>> won’t compact manifests.
>>>>
>>>> This is helpful. Thanks for the pointer on increasing parallelism. Will
>>>> try this out. So I understand the behaviour, if a different dataset has
>>>> >=5000  batches then the resultant # manifests would be (total_num_batches
>>>> % 5000 ) ?
>>>>
>>>> > What surprises me is that you’re not getting much benefit from
>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>> it.
>>>>
>>>> Pardon the verbose example but i think it'l help explain what i'm
>>>> seeing ..
>>>>
>>>> Regarding manifest filtering:  I tested if partition filters in sql
>>>> query actually reduce manifests being inspected. In my example, i have 16
>>>> manifests that point to 4000 batch partitions ( each file is restricted to
>>>> one partition as we'r using physical partitioning in the table ).  So when
>>>> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
>>>> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
>>>> 16) , right? But i see that all 16 are being inspected in
>>>> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
>>>> that should be giving me the manifests that match a partition. When I
>>>> inspect this  it says `matchingManifests = 16` ,  which is all the
>>>> manifests in the table.  This *could* be due to the fact that our
>>>> batch ids are random UUIDs so lower/upper bounds may not help coz there's
>>>> no inherent ordering amongst batches.
>>>> But then  i tried year = 2019 and month = 01 and day = 01 which also
>>>> scanned all manifests. Could this be due to the way Iceberg manifests are
>>>> re-grouped and merged? If so, shouldn't re-grouping honour partition
>>>> boundaries and optimize for it?
>>>>
>>>>
>>>> Cheers,
>>>> -Gautam.
>>>>
>>>> [1] -
>>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>>>>
>>>>
>>>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue  wrote:
>>>>
>>>>> Good questions. Grouping manifests is configurable at the table level.
>>>>> There are 2 settings:
>>>>>
>>>>>- commit.manifest.target-size-bytes defaults to 8MB, this is the
>>>>>target size that Iceberg will compact to
>>>>>- commit.manifest.min-count-to-merge defaults to 100, this is the
>>>>>minimum number of files before a compaction is triggered
>>>>>
&g

Re: Reading dataset with stats making lots of network traffic..

2019-05-02 Thread Gautam
Ok, thanks for the tip on not having to by tied to a hierarchical
partition spec.

Although this still doesn't explain why all the manifests are scanned,  coz
it should be pruning the list of manifests and it's not. Is my
understanding correct that the manifest grouping might be re-shuffling up
the days so query on 1 day might map to all manifests even? Does manifest
merging optimize for partition boundaries or is it based on manifest's
natural order?

On Thu, May 2, 2019 at 2:06 PM Ryan Blue  wrote:

> You also don't need to use year, month, and day. You can just use day.
>
> The time-based partition functions all produce ordinals, not local values:
> month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day and hour.
> In fact, I should open a PR to throw an exception when there are duplicate
> partition functions...
>
> On Thu, May 2, 2019 at 1:52 PM Gautam  wrote:
>
>> FYI .. The test Partition Spec is  :
>> [
>>   YEAR: identity(21)
>>   MONTH: identity(22)
>>   DAY: identity(23)
>>   batchId: identity(24)
>> ]
>>
>>
>>
>> On Thu, May 2, 2019 at 1:46 PM Gautam  wrote:
>>
>>> > Using those, you should be able to control parallelism. If you want to
>>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>>> compact manifests.
>>>
>>> This is helpful. Thanks for the pointer on increasing parallelism. Will
>>> try this out. So I understand the behaviour, if a different dataset has
>>> >=5000  batches then the resultant # manifests would be (total_num_batches
>>> % 5000 ) ?
>>>
>>> > What surprises me is that you’re not getting much benefit from
>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>> it.
>>>
>>> Pardon the verbose example but i think it'l help explain what i'm seeing
>>> ..
>>>
>>> Regarding manifest filtering:  I tested if partition filters in sql
>>> query actually reduce manifests being inspected. In my example, i have 16
>>> manifests that point to 4000 batch partitions ( each file is restricted to
>>> one partition as we'r using physical partitioning in the table ).  So when
>>> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
>>> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
>>> 16) , right? But i see that all 16 are being inspected in
>>> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
>>> that should be giving me the manifests that match a partition. When I
>>> inspect this  it says `matchingManifests = 16` ,  which is all the
>>> manifests in the table.  This *could* be due to the fact that our batch
>>> ids are random UUIDs so lower/upper bounds may not help coz there's no
>>> inherent ordering amongst batches.
>>> But then  i tried year = 2019 and month = 01 and day = 01 which also
>>> scanned all manifests. Could this be due to the way Iceberg manifests are
>>> re-grouped and merged? If so, shouldn't re-grouping honour partition
>>> boundaries and optimize for it?
>>>
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>> [1] -
>>> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>>>
>>>
>>> On Thu, May 2, 2019 at 12:27 PM Ryan Blue  wrote:
>>>
>>>> Good questions. Grouping manifests is configurable at the table level.
>>>> There are 2 settings:
>>>>
>>>>- commit.manifest.target-size-bytes defaults to 8MB, this is the
>>>>target size that Iceberg will compact to
>>>>- commit.manifest.min-count-to-merge defaults to 100, this is the
>>>>minimum number of files before a compaction is triggered
>>>>
>>>> Using those, you should be able to control parallelism. If you want to
>>>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>>>> compact manifests.
>>>>
>>>> What surprises me is that you’re not getting much benefit from
>>>> filtering out manifests that aren’t helpful. We see a lot of benefit from
>>>> it. You might try sorting the data files by partition before adding them to
>>>> the table. That will cluster data files in the same partition so you can
>>>> read fewer manifests.
>>>>
>>>> On Thu, May 2, 2019 at 12:09 PM Gautam  wrote:
>>>>
>>>

Re: Reading dataset with stats making lots of network traffic..

2019-05-02 Thread Gautam
FYI .. The test Partition Spec is  :
[
  YEAR: identity(21)
  MONTH: identity(22)
  DAY: identity(23)
  batchId: identity(24)
]



On Thu, May 2, 2019 at 1:46 PM Gautam  wrote:

> > Using those, you should be able to control parallelism. If you want to
> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
> compact manifests.
>
> This is helpful. Thanks for the pointer on increasing parallelism. Will
> try this out. So I understand the behaviour, if a different dataset has
> >=5000  batches then the resultant # manifests would be (total_num_batches
> % 5000 ) ?
>
> > What surprises me is that you’re not getting much benefit from filtering
> out manifests that aren’t helpful. We see a lot of benefit from it.
>
> Pardon the verbose example but i think it'l help explain what i'm seeing
> ..
>
> Regarding manifest filtering:  I tested if partition filters in sql query
> actually reduce manifests being inspected. In my example, i have 16
> manifests that point to 4000 batch partitions ( each file is restricted to
> one partition as we'r using physical partitioning in the table ).  So when
> querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
> 16) , right? But i see that all 16 are being inspected in
> BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
> that should be giving me the manifests that match a partition. When I
> inspect this  it says `matchingManifests = 16` ,  which is all the
> manifests in the table.  This *could* be due to the fact that our batch
> ids are random UUIDs so lower/upper bounds may not help coz there's no
> inherent ordering amongst batches.
> But then  i tried year = 2019 and month = 01 and day = 01 which also
> scanned all manifests. Could this be due to the way Iceberg manifests are
> re-grouped and merged? If so, shouldn't re-grouping honour partition
> boundaries and optimize for it?
>
>
> Cheers,
> -Gautam.
>
> [1] -
> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173
>
>
> On Thu, May 2, 2019 at 12:27 PM Ryan Blue  wrote:
>
>> Good questions. Grouping manifests is configurable at the table level.
>> There are 2 settings:
>>
>>- commit.manifest.target-size-bytes defaults to 8MB, this is the
>>target size that Iceberg will compact to
>>- commit.manifest.min-count-to-merge defaults to 100, this is the
>>minimum number of files before a compaction is triggered
>>
>> Using those, you should be able to control parallelism. If you want to
>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
>> compact manifests.
>>
>> What surprises me is that you’re not getting much benefit from filtering
>> out manifests that aren’t helpful. We see a lot of benefit from it. You
>> might try sorting the data files by partition before adding them to the
>> table. That will cluster data files in the same partition so you can read
>> fewer manifests.
>>
>> On Thu, May 2, 2019 at 12:09 PM Gautam  wrote:
>>
>>> Hey Anton,
>>> Sorry bout the delay on this. Been caught up with some other
>>> things. Thanks for raising issue#173 .
>>>
>>> So the root cause is indeed the density and size of the schema. While I
>>> agree the option to configure stats for columns is good (although i'm not
>>> fully convinced that this is purely due to lower/upper bounds). For
>>> instance, maybe it's just taking a while to iterate over manifest rows and
>>> deserialize the DataFile stats in each read?  The solution i'm using right
>>> now is to parallelize the manifest reading in split planning. We
>>> regenerated the Iceberg table with more manifests. Now the code enables the
>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>>> default, configurable using 'iceberg.worker.num-threads' ) to read
>>> manifests.
>>>
>>> On that note, the ability to parallelize is limited to how many
>>> manifests are in the table. So as a test, for a table with 4000 files we
>>> created one manifest per file (think of one file as a single batch commit
>>> in this case). So I was hoping to get a parallelism factor of 4000. But
>>> Iceberg summarizes manifests into fewer manifests with each commit so we
>>> instead ended up with 16 manifests. So now split planning is limited to
>>> reading at most 16 units of parallelism. Is this grouping of m

Re: Reading dataset with stats making lots of network traffic..

2019-05-02 Thread Gautam
> Using those, you should be able to control parallelism. If you want to
test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
compact manifests.

This is helpful. Thanks for the pointer on increasing parallelism. Will try
this out. So I understand the behaviour, if a different dataset has >=5000
batches then the resultant # manifests would be (total_num_batches % 5000 )
?

> What surprises me is that you’re not getting much benefit from filtering
out manifests that aren’t helpful. We see a lot of benefit from it.

Pardon the verbose example but i think it'l help explain what i'm seeing ..

Regarding manifest filtering:  I tested if partition filters in sql query
actually reduce manifests being inspected. In my example, i have 16
manifests that point to 4000 batch partitions ( each file is restricted to
one partition as we'r using physical partitioning in the table ).  So when
querying for .. WHERE  batchId = 'xyz'  .. at most 1 manifest should be
read coz 1 batch == 1 file which should be tracked by 1 manifest (among the
16) , right? But i see that all 16 are being inspected in
BaseTableScan.planFiles().  Correct me if i'm wrong, it's this call [1]
that should be giving me the manifests that match a partition. When I
inspect this  it says `matchingManifests = 16` ,  which is all the
manifests in the table.  This *could* be due to the fact that our batch ids
are random UUIDs so lower/upper bounds may not help coz there's no inherent
ordering amongst batches.
But then  i tried year = 2019 and month = 01 and day = 01 which also
scanned all manifests. Could this be due to the way Iceberg manifests are
re-grouped and merged? If so, shouldn't re-grouping honour partition
boundaries and optimize for it?


Cheers,
-Gautam.

[1] -
https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173


On Thu, May 2, 2019 at 12:27 PM Ryan Blue  wrote:

> Good questions. Grouping manifests is configurable at the table level.
> There are 2 settings:
>
>- commit.manifest.target-size-bytes defaults to 8MB, this is the
>target size that Iceberg will compact to
>- commit.manifest.min-count-to-merge defaults to 100, this is the
>minimum number of files before a compaction is triggered
>
> Using those, you should be able to control parallelism. If you want to
> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t
> compact manifests.
>
> What surprises me is that you’re not getting much benefit from filtering
> out manifests that aren’t helpful. We see a lot of benefit from it. You
> might try sorting the data files by partition before adding them to the
> table. That will cluster data files in the same partition so you can read
> fewer manifests.
>
> On Thu, May 2, 2019 at 12:09 PM Gautam  wrote:
>
>> Hey Anton,
>> Sorry bout the delay on this. Been caught up with some other
>> things. Thanks for raising issue#173 .
>>
>> So the root cause is indeed the density and size of the schema. While I
>> agree the option to configure stats for columns is good (although i'm not
>> fully convinced that this is purely due to lower/upper bounds). For
>> instance, maybe it's just taking a while to iterate over manifest rows and
>> deserialize the DataFile stats in each read?  The solution i'm using right
>> now is to parallelize the manifest reading in split planning. We
>> regenerated the Iceberg table with more manifests. Now the code enables the
>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by
>> default, configurable using 'iceberg.worker.num-threads' ) to read
>> manifests.
>>
>> On that note, the ability to parallelize is limited to how many manifests
>> are in the table. So as a test, for a table with 4000 files we created one
>> manifest per file (think of one file as a single batch commit in this
>> case). So I was hoping to get a parallelism factor of 4000. But Iceberg
>> summarizes manifests into fewer manifests with each commit so we instead
>> ended up with 16 manifests. So now split planning is limited to reading at
>> most 16 units of parallelism. Is this grouping of manifests into fewer
>> configurable? if not should we allow making this configurable?
>>
>> Sorry if this is forking a different conversation. If so, I can start a
>> separate conversation thread on this.
>>
>>
>>
>>
>>
>>
>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi 
>> wrote:
>>
>>> Hey Gautam,
>>>
>>> Out of my curiosity, did you manage to confirm the root cause of the
>>> issue?
>>>
>>> P.S. I created [1] so 

Re: Reading dataset with stats making lots of network traffic..

2019-05-02 Thread Gautam
Hey Anton,
Sorry bout the delay on this. Been caught up with some other
things. Thanks for raising issue#173 .

So the root cause is indeed the density and size of the schema. While I
agree the option to configure stats for columns is good (although i'm not
fully convinced that this is purely due to lower/upper bounds). For
instance, maybe it's just taking a while to iterate over manifest rows and
deserialize the DataFile stats in each read?  The solution i'm using right
now is to parallelize the manifest reading in split planning. We
regenerated the Iceberg table with more manifests. Now the code enables the
ParallelIterator which uses a worker pool of threads (1 thread per cpu by
default, configurable using 'iceberg.worker.num-threads' ) to read
manifests.

On that note, the ability to parallelize is limited to how many manifests
are in the table. So as a test, for a table with 4000 files we created one
manifest per file (think of one file as a single batch commit in this
case). So I was hoping to get a parallelism factor of 4000. But Iceberg
summarizes manifests into fewer manifests with each commit so we instead
ended up with 16 manifests. So now split planning is limited to reading at
most 16 units of parallelism. Is this grouping of manifests into fewer
configurable? if not should we allow making this configurable?

Sorry if this is forking a different conversation. If so, I can start a
separate conversation thread on this.






On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi 
wrote:

> Hey Gautam,
>
> Out of my curiosity, did you manage to confirm the root cause of the issue?
>
> P.S. I created [1] so that we can make collection of lower/upper bounds
> configurable.
>
> Thanks,
> Anton
>
> [1] - https://github.com/apache/incubator-iceberg/issues/173
>
> On 22 Apr 2019, at 09:15, Gautam  wrote:
>
> Thanks guys for the insights ..
>
> > I like Anton's idea to have an optional list of columns for which we
> keep stats. That would allow us to avoid storing stats for thousands of
> columns that won't ever be used. Another option here is to add a flag to
> keep stats only for top-level columns. That's much less configuration for
> users and probably does the right thing in many cases. Simpler to use but
> not as fast in all cases is sometimes a good compromise.
>
> This makes sense to me. It adds a variable that data pipelines can tweak
> on to improve performance. I will add an issue on Github to add a stats
> config/flag. Although, having said that, I would try to optimize around
> this coz read patterns are hardly ever known a priori and adding a column
> to this list means having to re-write the entire data again. So i'l try the
> other suggestion which is parallelizing on multiple manifests.
>
> >  To clarify my comment on changing the storage: the idea is to use
> separate columns instead of a map and then use a columnar storage format so
> we can project those columns independently. Avro can't project columns
> independently. This wouldn't help on the write side and may just cause a
> lot of seeking on the read side that diminishes the benefits.
>
> Gotcha.
>
> > Also, now that we have more details, I think there is a second problem.
> Because we expect several manifests in a table, we parallelize split
> planning on manifests instead of splits of manifest files. This planning
> operation is happening in a single thread instead of in parallel. I think
> if you split the write across several manifests, you'd improve wall time.
>
> This might actually be the issue here, this was a test bench dataset so
> the writer job created a single manifest for all the data in the dataset
> which isn't really how we will do things in prod. I'l try and create the
> metadata based on productions expected commit pattern.
>
>
> Regarding Iceberg not truncating large bounded column values
> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't
> consider this with our dataset. The current evidence is leading towards the
> number of columns and the sheer number of files that the manifest is
> maintaining but this is a good thing to look into.
>
> Thanks again guys.
>
> -Gautam.
>
>
>
>
>
>
>
> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue  wrote:
>
>> I like Anton's idea to have an optional list of columns for which we keep
>> stats. That would allow us to avoid storing stats for thousands of columns
>> that won't ever be used. Another option here is to add a flag to keep stats
>> only for top-level columns. That's much less configuration for users and
>> probably does the right thing in many cases. Simpler to use but not as fast
>> in all cases is sometimes a goo

Re: Reading dataset with stats making lots of network traffic..

2019-04-22 Thread Gautam
Thanks guys for the insights ..

> I like Anton's idea to have an optional list of columns for which we keep
stats. That would allow us to avoid storing stats for thousands of columns
that won't ever be used. Another option here is to add a flag to keep stats
only for top-level columns. That's much less configuration for users and
probably does the right thing in many cases. Simpler to use but not as fast
in all cases is sometimes a good compromise.

This makes sense to me. It adds a variable that data pipelines can tweak on
to improve performance. I will add an issue on Github to add a stats
config/flag. Although, having said that, I would try to optimize around
this coz read patterns are hardly ever known a priori and adding a column
to this list means having to re-write the entire data again. So i'l try the
other suggestion which is parallelizing on multiple manifests.

>  To clarify my comment on changing the storage: the idea is to use
separate columns instead of a map and then use a columnar storage format so
we can project those columns independently. Avro can't project columns
independently. This wouldn't help on the write side and may just cause a
lot of seeking on the read side that diminishes the benefits.

Gotcha.

> Also, now that we have more details, I think there is a second problem.
Because we expect several manifests in a table, we parallelize split
planning on manifests instead of splits of manifest files. This planning
operation is happening in a single thread instead of in parallel. I think
if you split the write across several manifests, you'd improve wall time.

This might actually be the issue here, this was a test bench dataset so the
writer job created a single manifest for all the data in the dataset which
isn't really how we will do things in prod. I'l try and create the metadata
based on productions expected commit pattern.


Regarding Iceberg not truncating large bounded column values
https://github.com/apache/incubator-iceberg/issues/113 .. I didn't consider
this with our dataset. The current evidence is leading towards the number
of columns and the sheer number of files that the manifest is maintaining
but this is a good thing to look into.

Thanks again guys.

-Gautam.







On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue  wrote:

> I like Anton's idea to have an optional list of columns for which we keep
> stats. That would allow us to avoid storing stats for thousands of columns
> that won't ever be used. Another option here is to add a flag to keep stats
> only for top-level columns. That's much less configuration for users and
> probably does the right thing in many cases. Simpler to use but not as fast
> in all cases is sometimes a good compromise.
>
> To clarify my comment on changing the storage: the idea is to use separate
> columns instead of a map and then use a columnar storage format so we can
> project those columns independently. Avro can't project columns
> independently. This wouldn't help on the write side and may just cause a
> lot of seeking on the read side that diminishes the benefits.
>
> Also, now that we have more details, I think there is a second problem.
> Because we expect several manifests in a table, we parallelize split
> planning on manifests instead of splits of manifest files. This planning
> operation is happening in a single thread instead of in parallel. I think
> if you split the write across several manifests, you'd improve wall time.
>
> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi 
> wrote:
>
>> No, we haven’t experienced it yet. The manifest size is huge in your
>> case. To me, Ryan is correct: it might be either big lower/upper bounds
>> (then truncation will help) or a big number columns (then collecting
>> lower/upper bounds only for specific columns will help). I think both
>> optimizations are needed and will reduce the manifest size.
>>
>> Since you mentioned you have a lot of columns and we collect bounds for
>> nested struct fields, I am wondering if you could revert [1] locally and
>> compare the manifest size.
>>
>> [1] -
>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f
>>
>> On 19 Apr 2019, at 15:42, Gautam  wrote:
>>
>> Thanks for responding Anton! Do we think the delay is mainly due to
>> lower/upper bound filtering? have you faced this? I haven't exactly found
>> where the slowness is yet. It's generally due to the stats filtering but
>> what part of it is causing this much network traffic. There's
>> CloseableIteratable  that takes a ton of time on the next() and hasNext()
>> calls. My guess is the expression evaluation on each manifest entry is
>> what's doing it.

Re: Reading dataset with stats making lots of network traffic..

2019-04-19 Thread Gautam
Thanks for responding Anton! Do we think the delay is mainly due to
lower/upper bound filtering? have you faced this? I haven't exactly found
where the slowness is yet. It's generally due to the stats filtering but
what part of it is causing this much network traffic. There's
CloseableIteratable  that takes a ton of time on the next() and hasNext()
calls. My guess is the expression evaluation on each manifest entry is
what's doing it.

On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi 
wrote:

> I think we need to have a list of columns for which we want to collect
> stats and that should be configurable by the user. Maybe, this config
> should be applicable only to lower/upper bounds. As we now collect stats
> even for nested struct fields, this might generate a lot of data. In most
> cases, users cluster/sort their data by a subset of data columns to have
> fast queries with predicates on those columns. So, being able to configure
> columns for which to collect lower/upper bounds seems reasonable.
>
> On 19 Apr 2019, at 08:03, Gautam  wrote:
>
> >  The length in bytes of the schema is 109M as compared to 687K of the
> non-stats dataset.
>
> Typo, length in bytes of *manifest*. schema is the same.
>
> On Fri, Apr 19, 2019 at 12:16 PM Gautam  wrote:
>
>> Correction, partition count = 4308.
>>
>> > Re: Changing the way we keep stats. Avro is a block splittable format
>> and is friendly with parallel compute frameworks like Spark.
>>
>> Here I am trying to say that we don't need to change the format to
>> columnar right? The current format is already friendly for parallelization.
>>
>> thanks.
>>
>>
>>
>>
>>
>> On Fri, Apr 19, 2019 at 12:12 PM Gautam  wrote:
>>
>>> Ah, my bad. I missed adding in the schema details .. Here are some
>>> details on the dataset with stats :
>>>
>>>  Iceberg Schema Columns : 20
>>>  Spark Schema fields : 20
>>>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
>>> changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>>>  Manifest files :1
>>>  Manifest details:
>>>  => manifest file path:
>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>>  => manifest file length: 109,028,885
>>>  => existing files count: 0
>>>  => added files count: 4308
>>>  => deleted files count: 0
>>>  => partitions count: 4
>>>  => partition fields count: 4
>>>
>>> Re: Num data files. It has a single manifest keep track of 4308 files.
>>> Total record count is 11.4 Million.
>>>
>>> Re: Columns. You are right that this table has many columns.. although
>>> it has only 20 top-level columns,  num leaf columns are in order of
>>> thousands. This Schema is heavy on structs (in the thousands) and has deep
>>> levels of nesting.  I know Iceberg keeps
>>> *column_sizes, value_counts, null_value_counts* for all leaf fields and
>>> additionally *lower-bounds, upper-bounds* for native, struct types (not
>>> yet for map KVs and arrays).  The length in bytes of the schema is 109M as
>>> compared to 687K of the non-stats dataset.
>>>
>>> Re: Turning off stats. I am looking to leverage stats coz for our
>>> datasets with much larger number of data files we want to leverage
>>> iceberg's ability to skip entire files based on these stats. This is one of
>>> the big incentives for us to use Iceberg.
>>>
>>> Re: Changing the way we keep stats. Avro is a block splittable format
>>> and is friendly with parallel compute frameworks like Spark. So would it
>>> make sense for instance to have add an option to have Spark job / Futures
>>> handle split planning?   In a larger context, 109M is not that much
>>> metadata given that Iceberg is meant for datasets where the metadata itself
>>> is Bigdata scale.  I'm curious on how folks with larger sized metadata (in
>>> GB) are optimizing this today.
>>>
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue 
>>> wrote:
>>>
>>>> Thanks for bringing this up! My initial theory is that this table has a
>>>> ton of stats data that you have to read. That could happen in a couple of
>>>> cases.
>>>>
>>>> First, you might have large values in some columns. Parquet will
>>>> suppress its stats if values are

Re: Reading dataset with stats making lots of network traffic..

2019-04-19 Thread Gautam
>  The length in bytes of the schema is 109M as compared to 687K of the
non-stats dataset.

Typo, length in bytes of *manifest*. schema is the same.

On Fri, Apr 19, 2019 at 12:16 PM Gautam  wrote:

> Correction, partition count = 4308.
>
> > Re: Changing the way we keep stats. Avro is a block splittable format
> and is friendly with parallel compute frameworks like Spark.
>
> Here I am trying to say that we don't need to change the format to
> columnar right? The current format is already friendly for parallelization.
>
> thanks.
>
>
>
>
>
> On Fri, Apr 19, 2019 at 12:12 PM Gautam  wrote:
>
>> Ah, my bad. I missed adding in the schema details .. Here are some
>> details on the dataset with stats :
>>
>>  Iceberg Schema Columns : 20
>>  Spark Schema fields : 20
>>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
>> changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>>  Manifest files :1
>>  Manifest details:
>>  => manifest file path:
>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>>  => manifest file length: 109,028,885
>>  => existing files count: 0
>>  => added files count: 4308
>>  => deleted files count: 0
>>  => partitions count: 4
>>  => partition fields count: 4
>>
>> Re: Num data files. It has a single manifest keep track of 4308 files.
>> Total record count is 11.4 Million.
>>
>> Re: Columns. You are right that this table has many columns.. although it
>> has only 20 top-level columns,  num leaf columns are in order of thousands.
>> This Schema is heavy on structs (in the thousands) and has deep levels of
>> nesting.  I know Iceberg keeps
>> *column_sizes, value_counts, null_value_counts* for all leaf fields and
>> additionally *lower-bounds, upper-bounds* for native, struct types (not
>> yet for map KVs and arrays).  The length in bytes of the schema is 109M as
>> compared to 687K of the non-stats dataset.
>>
>> Re: Turning off stats. I am looking to leverage stats coz for our
>> datasets with much larger number of data files we want to leverage
>> iceberg's ability to skip entire files based on these stats. This is one of
>> the big incentives for us to use Iceberg.
>>
>> Re: Changing the way we keep stats. Avro is a block splittable format and
>> is friendly with parallel compute frameworks like Spark. So would it make
>> sense for instance to have add an option to have Spark job / Futures
>> handle split planning?   In a larger context, 109M is not that much
>> metadata given that Iceberg is meant for datasets where the metadata itself
>> is Bigdata scale.  I'm curious on how folks with larger sized metadata (in
>> GB) are optimizing this today.
>>
>>
>> Cheers,
>> -Gautam.
>>
>>
>>
>>
>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue 
>> wrote:
>>
>>> Thanks for bringing this up! My initial theory is that this table has a
>>> ton of stats data that you have to read. That could happen in a couple of
>>> cases.
>>>
>>> First, you might have large values in some columns. Parquet will
>>> suppress its stats if values are larger than 4k and those are what Iceberg
>>> uses. But that could still cause you to store two 1k+ objects for each
>>> large column (lower and upper bounds). With a lot of data files, that could
>>> add up quickly. The solution here is to implement #113
>>> <https://github.com/apache/incubator-iceberg/issues/113> so that we
>>> don't store the actual min and max for string or binary columns, but
>>> instead a truncated value that is just above or just below.
>>>
>>> The second case is when you have a lot of columns. Each column stores
>>> both a lower and upper bound, so 1,000 columns could easily take 8k per
>>> file. If this is the problem, then maybe we want to have a way to turn off
>>> column stats. We could also think of ways to change the way stats are
>>> stored in the manifest files, but that only helps if we move to a columnar
>>> format to store manifests, so this is probably not a short-term fix.
>>>
>>> If you can share a bit more information about this table, we can
>>> probably tell which one is the problem. I'm guessing it is the large values
>>> problem.
>>>
>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam  wrote:
>>>
>>>> Hello folks,
>>>>
>>>> I have been testing Iceberg read

Re: Reading dataset with stats making lots of network traffic..

2019-04-18 Thread Gautam
Ah, my bad. I missed adding in the schema details .. Here are some details
on the dataset with stats :

 Iceberg Schema Columns : 20
 Spark Schema fields : 20
 Snapshot Summary :{added-data-files=4308, added-records=11494037,
changed-partition-count=4308, total-records=11494037, total-data-files=4308}
 Manifest files :1
 Manifest details:
 => manifest file path:
adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
 => manifest file length: 109,028,885
 => existing files count: 0
 => added files count: 4308
 => deleted files count: 0
 => partitions count: 4
 => partition fields count: 4

Re: Num data files. It has a single manifest keep track of 4308 files.
Total record count is 11.4 Million.

Re: Columns. You are right that this table has many columns.. although it
has only 20 top-level columns,  num leaf columns are in order of thousands.
This Schema is heavy on structs (in the thousands) and has deep levels of
nesting.  I know Iceberg keeps
*column_sizes, value_counts, null_value_counts* for all leaf fields and
additionally *lower-bounds, upper-bounds* for native, struct types (not yet
for map KVs and arrays).  The length in bytes of the schema is 109M as
compared to 687K of the non-stats dataset.

Re: Turning off stats. I am looking to leverage stats coz for our datasets
with much larger number of data files we want to leverage iceberg's ability
to skip entire files based on these stats. This is one of the big
incentives for us to use Iceberg.

Re: Changing the way we keep stats. Avro is a block splittable format and
is friendly with parallel compute frameworks like Spark. So would it make
sense for instance to have add an option to have Spark job / Futures
handle split planning?   In a larger context, 109M is not that much
metadata given that Iceberg is meant for datasets where the metadata itself
is Bigdata scale.  I'm curious on how folks with larger sized metadata (in
GB) are optimizing this today.


Cheers,
-Gautam.




On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue 
wrote:

> Thanks for bringing this up! My initial theory is that this table has a
> ton of stats data that you have to read. That could happen in a couple of
> cases.
>
> First, you might have large values in some columns. Parquet will suppress
> its stats if values are larger than 4k and those are what Iceberg uses. But
> that could still cause you to store two 1k+ objects for each large column
> (lower and upper bounds). With a lot of data files, that could add up
> quickly. The solution here is to implement #113
> <https://github.com/apache/incubator-iceberg/issues/113> so that we don't
> store the actual min and max for string or binary columns, but instead a
> truncated value that is just above or just below.
>
> The second case is when you have a lot of columns. Each column stores both
> a lower and upper bound, so 1,000 columns could easily take 8k per file. If
> this is the problem, then maybe we want to have a way to turn off column
> stats. We could also think of ways to change the way stats are stored in
> the manifest files, but that only helps if we move to a columnar format to
> store manifests, so this is probably not a short-term fix.
>
> If you can share a bit more information about this table, we can probably
> tell which one is the problem. I'm guessing it is the large values problem.
>
> On Thu, Apr 18, 2019 at 11:52 AM Gautam  wrote:
>
>> Hello folks,
>>
>> I have been testing Iceberg reading with and without stats built into
>> Iceberg dataset manifest and found that there's a huge jump in network
>> traffic with the latter..
>>
>>
>> In my test I am comparing two Iceberg datasets, both written in Iceberg
>> format. One with and the other without stats collected in Iceberg
>> manifests. In particular the difference between the writers used for the
>> two datasets is this PR:
>> https://github.com/apache/incubator-iceberg/pull/63/files which uses
>> Iceberg's writers for writing Parquet data. I captured tcpdump from query
>> scans run on these two datasets.  The partition being scanned contains 1
>> manifest, 1 parquet data file and ~3700 rows in both datasets. There's a
>> 30x jump in network traffic to the remote filesystem (ADLS) when i switch
>> to stats based Iceberg dataset. Both queries used the same Iceberg reader
>> code to access both datasets.
>>
>> ```
>> root@d69e104e7d40:/usr/local/spark#  tcpdump -r
>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
>> reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap,
>> link-type EN10MB (Ethernet)
>>
>> *88

Re: Reading dataset with stats making lots of network traffic..

2019-04-18 Thread Gautam
Correction, partition count = 4308.

> Re: Changing the way we keep stats. Avro is a block splittable format and
is friendly with parallel compute frameworks like Spark.

Here I am trying to say that we don't need to change the format to columnar
right? The current format is already friendly for parallelization.

thanks.





On Fri, Apr 19, 2019 at 12:12 PM Gautam  wrote:

> Ah, my bad. I missed adding in the schema details .. Here are some details
> on the dataset with stats :
>
>  Iceberg Schema Columns : 20
>  Spark Schema fields : 20
>  Snapshot Summary :{added-data-files=4308, added-records=11494037,
> changed-partition-count=4308, total-records=11494037, total-data-files=4308}
>  Manifest files :1
>  Manifest details:
>  => manifest file path:
> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro
>  => manifest file length: 109,028,885
>  => existing files count: 0
>  => added files count: 4308
>  => deleted files count: 0
>  => partitions count: 4
>  => partition fields count: 4
>
> Re: Num data files. It has a single manifest keep track of 4308 files.
> Total record count is 11.4 Million.
>
> Re: Columns. You are right that this table has many columns.. although it
> has only 20 top-level columns,  num leaf columns are in order of thousands.
> This Schema is heavy on structs (in the thousands) and has deep levels of
> nesting.  I know Iceberg keeps
> *column_sizes, value_counts, null_value_counts* for all leaf fields and
> additionally *lower-bounds, upper-bounds* for native, struct types (not
> yet for map KVs and arrays).  The length in bytes of the schema is 109M as
> compared to 687K of the non-stats dataset.
>
> Re: Turning off stats. I am looking to leverage stats coz for our datasets
> with much larger number of data files we want to leverage iceberg's ability
> to skip entire files based on these stats. This is one of the big
> incentives for us to use Iceberg.
>
> Re: Changing the way we keep stats. Avro is a block splittable format and
> is friendly with parallel compute frameworks like Spark. So would it make
> sense for instance to have add an option to have Spark job / Futures
> handle split planning?   In a larger context, 109M is not that much
> metadata given that Iceberg is meant for datasets where the metadata itself
> is Bigdata scale.  I'm curious on how folks with larger sized metadata (in
> GB) are optimizing this today.
>
>
> Cheers,
> -Gautam.
>
>
>
>
> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue 
> wrote:
>
>> Thanks for bringing this up! My initial theory is that this table has a
>> ton of stats data that you have to read. That could happen in a couple of
>> cases.
>>
>> First, you might have large values in some columns. Parquet will suppress
>> its stats if values are larger than 4k and those are what Iceberg uses. But
>> that could still cause you to store two 1k+ objects for each large column
>> (lower and upper bounds). With a lot of data files, that could add up
>> quickly. The solution here is to implement #113
>> <https://github.com/apache/incubator-iceberg/issues/113> so that we
>> don't store the actual min and max for string or binary columns, but
>> instead a truncated value that is just above or just below.
>>
>> The second case is when you have a lot of columns. Each column stores
>> both a lower and upper bound, so 1,000 columns could easily take 8k per
>> file. If this is the problem, then maybe we want to have a way to turn off
>> column stats. We could also think of ways to change the way stats are
>> stored in the manifest files, but that only helps if we move to a columnar
>> format to store manifests, so this is probably not a short-term fix.
>>
>> If you can share a bit more information about this table, we can probably
>> tell which one is the problem. I'm guessing it is the large values problem.
>>
>> On Thu, Apr 18, 2019 at 11:52 AM Gautam  wrote:
>>
>>> Hello folks,
>>>
>>> I have been testing Iceberg reading with and without stats built into
>>> Iceberg dataset manifest and found that there's a huge jump in network
>>> traffic with the latter..
>>>
>>>
>>> In my test I am comparing two Iceberg datasets, both written in Iceberg
>>> format. One with and the other without stats collected in Iceberg
>>> manifests. In particular the difference between the writers used for the
>>> two datasets is this PR:
>>> https://github.com/apache/incubator-iceberg/pull/63/files which uses
>>

Reading dataset with stats making lots of network traffic..

2019-04-18 Thread Gautam
Hello folks,

I have been testing Iceberg reading with and without stats built into
Iceberg dataset manifest and found that there's a huge jump in network
traffic with the latter..


In my test I am comparing two Iceberg datasets, both written in Iceberg
format. One with and the other without stats collected in Iceberg
manifests. In particular the difference between the writers used for the
two datasets is this PR:
https://github.com/apache/incubator-iceberg/pull/63/files which uses
Iceberg's writers for writing Parquet data. I captured tcpdump from query
scans run on these two datasets.  The partition being scanned contains 1
manifest, 1 parquet data file and ~3700 rows in both datasets. There's a
30x jump in network traffic to the remote filesystem (ADLS) when i switch
to stats based Iceberg dataset. Both queries used the same Iceberg reader
code to access both datasets.

```
root@d69e104e7d40:/usr/local/spark#  tcpdump -r
iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep
perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
reading from file iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap,
link-type EN10MB (Ethernet)

*8844*


root@d69e104e7d40:/usr/local/spark# tcpdump -r
iceberg_scratch_pad_demo_11_batch_query.pcap | grep
perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l
reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, link-type
EN10MB (Ethernet)

*269708*

```

As a consequence of this the query response times get affected drastically
(illustrated below). I must confess that I am on a slow internet connection
via VPN connecting to the remote FS. But the dataset without stats took
just 1m 49s while the dataset with stats took 26m 48s to read the same
sized data. Most of that time in the latter dataset was spent split
planning in Manifest reading and stats evaluation.

```
all=> select count(*)  from iceberg_geo1_metrixx_qc_postvalues where
batchId = '4a6f95abac924159bb3d7075373395c9';
 count(1)
--
 3627
(1 row)
*Time: 109673.202 ms (01:49.673)*

all=>  select count(*) from iceberg_scratch_pad_demo_11  where
_ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId =
'6d50eeb3e7d74b4f99eea91a27fc8f15';
 count(1)
--
 3808
(1 row)
*Time: 1608058.616 ms (26:48.059)*

```

Has anyone faced this? I'm wondering if there's some caching or parallelism
option here that can be leveraged.  Would appreciate some guidance. If
there isn't a straightforward fix and others feel this is an issue I can
raise an issue and look into it further.


Cheers,
-Gautam.


Re: Option to disable rewrites of IN predicates

2019-03-06 Thread Gautam Kowshik
+1 to implementing IN feature instead. We are also looking for IN / NOT-IN 
cases where the inclusion/exclusion set is very large. 


-Gautam

Sent from my iPhone

> On Mar 6, 2019, at 5:38 PM, Anton Okolnychyi  
> wrote:
> 
> For some reason, I thought there was a blocker there. As Iceberg is not using 
> org.apache.parquet.filter2.predicate.FilterApi in its Parquet reader then 
> makes sense to fix, of course.
> 
>> On 5 Mar 2019, at 18:38, Ryan Blue  wrote:
>> 
>> Would it make sense to add support for IN expressions instead? I'd rather 
>> get that done than build work-arounds.
>> 
>> On Tue, Mar 5, 2019 at 10:33 AM Anton Okolnychyi 
>>  wrote:
>>> Hey,
>>> 
>>> Iceberg Spark data source rewrites IN predicates as a mix of OR/EQ. I am 
>>> wondering if it makes sense to introduce a threshold when this rewrite 
>>> happens until [1] is resolved. We can have something similar to 
>>> “spark.sql.parquet.pushdown.inFilterThreshold” in Spark.
>>> 
>>> We have experienced a performance degradation on a few queries. One of the 
>>> queries had 5 predicates and 2 of them were IN. In this specific case, IN 
>>> predicates didn’t help to filter out files and just made the overall row 
>>> filter more complicated.
>>> 
>>> Thanks,
>>> Anton
>>> 
>>> 
>>> [1] - https://github.com/apache/incubator-iceberg/issues/39
>>> 
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix
> 


Re: Iceberg scans not keeping or using important file/column statistics in manifests ..

2019-03-06 Thread Gautam
Raised https://github.com/apache/incubator-iceberg/issues/122 for the
filtering support.

On Wed, Mar 6, 2019 at 1:34 AM Anton Okolnychyi
 wrote:

> Sounds good, Gautam.
>
> Our intention was to be able to filter out files using predicates on
> nested fields. For now, file skipping works only with predicates that
> involve top level attributes.
>
>
> On 5 Mar 2019, at 17:47, Gautam  wrote:
>
> Hey Anton,
>I'm curious how you are using the Struct metrics in your company,
> are you planning to use it for predicate pushdowns or something else
> entirely?
>
> Regarding timeline, that's fine, we can wait a week or two for your
> changes on collecting metrics. If I can assume that your changes will add
> the struct metrics, I could open a separate Iceberg issue about the struct
> expression handling. If Ryan and you agree on allowing struct based
> filtering in Iceberg as long as we avoid mixed filtering (map>
> , array> , etc.) I can go ahead and work on it.
>
> Cheers,
> -Gautam.
>
>
>
> On Tue, Mar 5, 2019 at 10:30 PM Anton Okolnychyi <
> aokolnyc...@apple.com.invalid> wrote:
>
>> Sorry for my late reply and thanks for testing Gautam!
>>
>> I had a local prototype that only collected metrics for nested structs
>> and stored them. I haven’t checked if Iceberg can make use of that right
>> now. As I understand Ryan’s comment and Gautam’s observations, we will need
>> changes to make it work even if we have proper min/max statistics. So, we
>> have two independent issues then. I was planning to add tests and submit
>> the collection upstream. However, open source approval within my company
>> might easily take another week or more. So, if we need this change earlier,
>> someone can implement it. Just let me know, I can help to review then.
>>
>> Thanks,
>> Anton
>>
>>
>> On 5 Mar 2019, at 09:51, Gautam  wrote:
>>
>> Thanks for the response Ryan, comments in line ...
>>
>> > Iceberg doesn't support binding expressions in sub-structs yet. So the
>> fix on the Iceberg side requires a few steps. First, collecting the metrics
>> from Parquet with Anton's PR, and second, updating expression binding to
>> work with structs.
>>
>> I don't think there is a PR up yet on collecting metrics on struct
>> fields, I could work on one if Anton isn't already on it (thanks for
>> calling it out in the issue Anton!).
>>
>> > The reason why binding doesn't work with structs yet it that we don't
>> want to bind structs that are within maps or arrays because those will
>> change the semantics of the expression. For example, a.b = 5 can be run on
>> a: struct but can't be run on a: list>.
>>
>> From the discussion on said issue [1] seems like we are ok with structs
>> being filtered on. About structs inside maps or arrays, can we not reject
>> the invalid cases in the expression evaluation?  As in, detect of what
>> nested type field 'a' is and allow or disallow appropriately? Having
>> support for just structs is a good incremental feature methinks. Especially
>> coz, as Anton pointed out,  Spark has a PR up [2] on pushing down
>> struct-based filters which one can cherry pick locally.
>>
>> > Also, the Avro problem wasn't because the manifests are stored as Avro.
>> Avro doesn't collect metrics about the data that is stored, but the
>> manifests have the metrics that were added with each file, so the problem
>> is not adding the metrics when you added the files. I think you've solved
>> the problem and correctly built your table metadata using the metrics from
>> the Parquet footers, but I still want to note the distinction: Avro
>> manifests store metrics correctly. Avro data files don't generate metrics.
>>
>> Gotcha!
>>
>> Cheers,
>> -Gautam.
>>
>> [1] - https://github.com/apache/incubator-iceberg/issues/78
>> [2] - https://github.com/apache/spark/pull/22573
>>
>>
>> On Sat, Mar 2, 2019 at 6:47 AM Ryan Blue 
>> wrote:
>>
>>> Iceberg doesn't support binding expressions in sub-structs yet. So the
>>> fix on the Iceberg side requires a few steps. First, collecting the metrics
>>> from Parquet with Anton's PR, and second, updating expression binding to
>>> work with structs.
>>>
>>> The reason why binding doesn't work with structs yet it that we don't
>>> want to bind structs that are within maps or arrays because those will
>>> change the semantics of the expression. For example, a.b = 

Re: [VOTE] Add the python implementation

2019-03-05 Thread Gautam Kowshik
+1

Sent from my iPhone

> On Mar 6, 2019, at 6:56 AM, RD  wrote:
> 
> +1
> 
>> On Tue, Mar 5, 2019 at 5:01 PM John Zhuge  wrote:
>> +1
>> 
>>> On Tue, Mar 5, 2019 at 4:59 PM Xabriel Collazo Mojica 
>>>  wrote:
>>> +1
>>> 
>>>  
>>> 
>>> Xabriel J Collazo Mojica  |  Senior Software Engineer  |  Adobe  |  
>>> xcoll...@adobe.com
>>> 
>>>  
>>> 
>>> From: Ted Gooch 
>>> Reply-To: "dev@iceberg.apache.org" 
>>> Date: Tuesday, March 5, 2019 at 4:21 PM
>>> To: "dev@iceberg.apache.org" 
>>> Cc: Ryan Blue 
>>> Subject: Re: [VOTE] Add the python implementation
>>> 
>>>  
>>> 
>>> +1 
>>> 
>>>  
>>> 
>>> On Tue, Mar 5, 2019 at 4:19 PM Ryan Blue  wrote:
>>> 
>>> +1
>>> 
>>>  
>>> 
>>> On Tue, Mar 5, 2019 at 4:18 PM Ryan Blue  wrote:
>>> 
>>> Hi everyone,
>>> 
>>>  
>>> 
>>> I'd like to propose accepting the current Python PR, #54. It has been 
>>> through a round of reviews and while there is certainly more to do, it is 
>>> going to be much easier to improve the implementation with smaller pull 
>>> requests than to keep updating one giant PR.
>>> 
>>>  
>>> 
>>> As noted in the discussion thread, this will not be included in the initial 
>>> Java release. Also, the code would live in the existing repository where we 
>>> already have visibility, until we have a reason to move it elsewhere.
>>> 
>>>  
>>> 
>>> Please vote in the next 72 hours:
>>> 
>>>  
>>> 
>>> [ ] +1: Commit the current Python PR implementation
>>> 
>>> [ ] +0: . . .
>>> 
>>> [ ] -1: Do not add the current implementation because . . .
>>> 
>>>  
>>> 
>>> Thanks!
>>> 
>>>  
>>> 
>>> rb
>>> 
>>>  
>>> 
>>> --
>>> 
>>> Ryan Blue
>>> 
>>> 
>>> 
>>>  
>>> 
>>> --
>>> 
>>> Ryan Blue
>>> 
>> 
>> 
>> -- 
>> John Zhuge


Re: Iceberg scans not keeping or using important file/column statistics in manifests ..

2019-03-05 Thread Gautam
Hey Anton,
   I'm curious how you are using the Struct metrics in your company,
are you planning to use it for predicate pushdowns or something else
entirely?

Regarding timeline, that's fine, we can wait a week or two for your changes
on collecting metrics. If I can assume that your changes will add the
struct metrics, I could open a separate Iceberg issue about the struct
expression handling. If Ryan and you agree on allowing struct based
filtering in Iceberg as long as we avoid mixed filtering (map>
, array> , etc.) I can go ahead and work on it.

Cheers,
-Gautam.



On Tue, Mar 5, 2019 at 10:30 PM Anton Okolnychyi
 wrote:

> Sorry for my late reply and thanks for testing Gautam!
>
> I had a local prototype that only collected metrics for nested structs and
> stored them. I haven’t checked if Iceberg can make use of that right now.
> As I understand Ryan’s comment and Gautam’s observations, we will need
> changes to make it work even if we have proper min/max statistics. So, we
> have two independent issues then. I was planning to add tests and submit
> the collection upstream. However, open source approval within my company
> might easily take another week or more. So, if we need this change earlier,
> someone can implement it. Just let me know, I can help to review then.
>
> Thanks,
> Anton
>
>
> On 5 Mar 2019, at 09:51, Gautam  wrote:
>
> Thanks for the response Ryan, comments in line ...
>
> > Iceberg doesn't support binding expressions in sub-structs yet. So the
> fix on the Iceberg side requires a few steps. First, collecting the metrics
> from Parquet with Anton's PR, and second, updating expression binding to
> work with structs.
>
> I don't think there is a PR up yet on collecting metrics on struct fields,
> I could work on one if Anton isn't already on it (thanks for calling it out
> in the issue Anton!).
>
> > The reason why binding doesn't work with structs yet it that we don't
> want to bind structs that are within maps or arrays because those will
> change the semantics of the expression. For example, a.b = 5 can be run on
> a: struct but can't be run on a: list>.
>
> From the discussion on said issue [1] seems like we are ok with structs
> being filtered on. About structs inside maps or arrays, can we not reject
> the invalid cases in the expression evaluation?  As in, detect of what
> nested type field 'a' is and allow or disallow appropriately? Having
> support for just structs is a good incremental feature methinks. Especially
> coz, as Anton pointed out,  Spark has a PR up [2] on pushing down
> struct-based filters which one can cherry pick locally.
>
> > Also, the Avro problem wasn't because the manifests are stored as Avro.
> Avro doesn't collect metrics about the data that is stored, but the
> manifests have the metrics that were added with each file, so the problem
> is not adding the metrics when you added the files. I think you've solved
> the problem and correctly built your table metadata using the metrics from
> the Parquet footers, but I still want to note the distinction: Avro
> manifests store metrics correctly. Avro data files don't generate metrics.
>
> Gotcha!
>
> Cheers,
> -Gautam.
>
> [1] - https://github.com/apache/incubator-iceberg/issues/78
> [2] - https://github.com/apache/spark/pull/22573
>
>
> On Sat, Mar 2, 2019 at 6:47 AM Ryan Blue 
> wrote:
>
>> Iceberg doesn't support binding expressions in sub-structs yet. So the
>> fix on the Iceberg side requires a few steps. First, collecting the metrics
>> from Parquet with Anton's PR, and second, updating expression binding to
>> work with structs.
>>
>> The reason why binding doesn't work with structs yet it that we don't
>> want to bind structs that are within maps or arrays because those will
>> change the semantics of the expression. For example, a.b = 5 can be run on
>> a: struct but can't be run on a: list>.
>>
>> Also, the Avro problem wasn't because the manifests are stored as Avro.
>> Avro doesn't collect metrics about the data that is stored, but the
>> manifests have the metrics that were added with each file, so the problem
>> is not adding the metrics when you added the files. I think you've solved
>> the problem and correctly built your table metadata using the metrics from
>> the Parquet footers, but I still want to note the distinction: Avro
>> manifests store metrics correctly. Avro data files don't generate metrics.
>>
>> On Thu, Feb 28, 2019 at 1:32 AM Gautam  wrote:
>>
>>> Hey Anton,
>>>   Wanted to circle back on the Spark PR [1] to a

Re: Iceberg scans not keeping or using important file/column statistics in manifests ..

2019-03-05 Thread Gautam
Thanks for the response Ryan, comments in line ...

> Iceberg doesn't support binding expressions in sub-structs yet. So the
fix on the Iceberg side requires a few steps. First, collecting the metrics
from Parquet with Anton's PR, and second, updating expression binding to
work with structs.

I don't think there is a PR up yet on collecting metrics on struct fields,
I could work on one if Anton isn't already on it (thanks for calling it out
in the issue Anton!).

> The reason why binding doesn't work with structs yet it that we don't
want to bind structs that are within maps or arrays because those will
change the semantics of the expression. For example, a.b = 5 can be run on
a: struct but can't be run on a: list>.

>From the discussion on said issue [1] seems like we are ok with structs
being filtered on. About structs inside maps or arrays, can we not reject
the invalid cases in the expression evaluation?  As in, detect of what
nested type field 'a' is and allow or disallow appropriately? Having
support for just structs is a good incremental feature methinks. Especially
coz, as Anton pointed out,  Spark has a PR up [2] on pushing down
struct-based filters which one can cherry pick locally.

> Also, the Avro problem wasn't because the manifests are stored as Avro.
Avro doesn't collect metrics about the data that is stored, but the
manifests have the metrics that were added with each file, so the problem
is not adding the metrics when you added the files. I think you've solved
the problem and correctly built your table metadata using the metrics from
the Parquet footers, but I still want to note the distinction: Avro
manifests store metrics correctly. Avro data files don't generate metrics.

Gotcha!

Cheers,
-Gautam.

[1] - https://github.com/apache/incubator-iceberg/issues/78
[2] - https://github.com/apache/spark/pull/22573


On Sat, Mar 2, 2019 at 6:47 AM Ryan Blue  wrote:

> Iceberg doesn't support binding expressions in sub-structs yet. So the fix
> on the Iceberg side requires a few steps. First, collecting the metrics
> from Parquet with Anton's PR, and second, updating expression binding to
> work with structs.
>
> The reason why binding doesn't work with structs yet it that we don't want
> to bind structs that are within maps or arrays because those will change
> the semantics of the expression. For example, a.b = 5 can be run on a:
> struct but can't be run on a: list>.
>
> Also, the Avro problem wasn't because the manifests are stored as Avro.
> Avro doesn't collect metrics about the data that is stored, but the
> manifests have the metrics that were added with each file, so the problem
> is not adding the metrics when you added the files. I think you've solved
> the problem and correctly built your table metadata using the metrics from
> the Parquet footers, but I still want to note the distinction: Avro
> manifests store metrics correctly. Avro data files don't generate metrics.
>
> On Thu, Feb 28, 2019 at 1:32 AM Gautam  wrote:
>
>> Hey Anton,
>>   Wanted to circle back on the Spark PR [1] to add support for nested
>> fields .. I tried applying it, tested it. With this change Spark pushes
>> filters on structs down to Iceberg, but Iceberg expression handling seems
>> to fail in validation ..
>>
>>
>> Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot
>> find field 'location.lat' in struct: struct<1: age: optional int, 2: name:
>> optional string, 3: friends: optional map, 4: location:
>> optional struct<7: lat: optional double, 8: lon: optional double>>
>>   at
>> com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42)
>>   at
>> com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76)
>>   at
>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138)
>>   at
>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94)
>>   at
>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147)
>>   at
>> com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160)
>>   at
>> com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108)
>>   at
>> com.netflix.iceberg.expressions.InclusiveManifestEvaluator.(InclusiveManifestEvaluator.java:57)
>>   at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153)
>>   at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149)
>>   at
>> com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(Loc

Re: [VOTE] Community code reviews

2019-02-28 Thread Gautam Kowshik
+1

Sent from my iPhone

> On Feb 28, 2019, at 10:09 PM, Daniel Weeks  wrote:
> 
> +1 (binding)
> 
> On 2019/02/27 21:11:01, Ryan Blue  wrote: 
> > This is a follow-up to the discussion thread, where we seem to have> 
> > consensus around the proposal to allow committers to commit their own pull> 
> > requests if a contributor has reviewed it and given a +1 and the PR has> 
> > been open for at least a couple of days to give time for other comments.> 
> > This is also only effective for 2019. After that, we should rely on a> 
> > larger base of committers.> 
> > 
> > Please vote on this proposal in the next 3 days.> 
> > 
> > [ ] +1, accept contributor reviews for committer pull requests> 
> > [ ] +0, . . .> 
> > [ ] -1, I think this is a bad idea because . . .> 
> > 
> > 
> > rb> 
> > 
> > -- > 
> > Ryan Blue> 
> > Software Engineer> 
> > Netflix> 
> >


Re: Iceberg scans not keeping or using important file/column statistics in manifests ..

2019-02-28 Thread Gautam
Hey Anton,
  Wanted to circle back on the Spark PR [1] to add support for nested
fields .. I tried applying it, tested it. With this change Spark pushes
filters on structs down to Iceberg, but Iceberg expression handling seems
to fail in validation ..


Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot find
field 'location.lat' in struct: struct<1: age: optional int, 2: name:
optional string, 3: friends: optional map, 4: location:
optional struct<7: lat: optional double, 8: lon: optional double>>
  at
com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42)
  at
com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76)
  at
com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138)
  at
com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94)
  at
com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147)
  at
com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160)
  at
com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108)
  at
com.netflix.iceberg.expressions.InclusiveManifestEvaluator.(InclusiveManifestEvaluator.java:57)
  at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153)
  at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149)
  at
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
  at
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
  at
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
  at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)


I think this should be handled in Iceberg as  struct filters like a.b.c =
"blah" is a legit way to query in SQL. If you feel this is a valid
assumption I can work on a fix.   Thoughts?


*Test Table Schema:*
scala> iceDf.printSchema
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- friends: map (nullable = true)
 ||-- key: string
 ||-- value: integer (valueContainsNull = true)
 |-- location: struct (nullable = true)
 ||-- lat: double (nullable = true)
 ||-- lon: double (nullable = true)


*Gist to recreate issue:*
https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac


Cheers,
-Gautam.



[1] -  https://github.com/apache/spark/pull/22573

On Tue, Feb 26, 2019 at 10:35 PM Anton Okolnychyi 
wrote:

> Unfortunately, Spark doesn’t push down filters for nested columns. I
> remember an effort to implement it [1]. However, it is not merged.
> So, even if we have proper statistics in Iceberg, we cannot leverage it
> from Spark.
>
> [1] - https://github.com/apache/spark/pull/22573
>
>
> On 26 Feb 2019, at 16:52, Gautam  wrote:
>
> Thanks Anton, this is very helpful!  I will apply the patch from pull#63
> and give it a shot.
>
> Re: Collecting min/max stas on nested structures ( 
> *https://github.com/apache/incubator-iceberg/issues/78
> <https://github.com/apache/incubator-iceberg/issues/78>* ) ...
>
> We have the exact same use case for skipping files on nested field
> filters. I was intrigued by your comment on enabling stats on nested
> structures by replacing `fileSchema.asStruct().field(fieldId)` with `
> fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` .. Have
> you had success with this? If so, I can try it out on our data as well.
>
>
>
>
>
> On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi 
> wrote:
>
>> Hi Gautam,
>>
>> I believe you see this behaviour because SparkAppenderFactory is
>> configured to use ParquetWriteAdapter. It only tracks the number of records
>> and uses ParquetWriteSupport from Spark. This means that the statistics is
>> not collected on writes and cannot be used on reads.
>>
>> Once [1] is merged, proper statistics will be fetched from the footer and
>> persisted in the manifests. The statistics is collected when writing data
>> files not manifests. See [2] for more info. Also, [3] contains an example
>> that filters out files (it requires [1] to be cherry-picked locally).
>>
>> Hope that helps,
>> Anton
>>
>> [1] - https://github.com/apache/incubator-iceberg/pull/63
>> [2] -
>> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
>> [3] - https://github.com/apache/incubator-iceberg/pull/105
>>
>>
>> On 26 Feb 2019, at 13:58, Gautam  wrote:
>>
>> .. Just to be clear my concern is around Iceberg not skipping files.
>> Iceberg does skip rowGroups when scanning files as
>> *iceberg.parquet.ParquetReader* uses the pa

Re: Iceberg scans not keeping or using important file/column statistics in manifests ..

2019-02-26 Thread Gautam
Thanks Anton, this is very helpful!  I will apply the patch from pull#63
and give it a shot.

Re: Collecting min/max stas on nested structures (
*https://github.com/apache/incubator-iceberg/issues/78
<https://github.com/apache/incubator-iceberg/issues/78>* ) ...

We have the exact same use case for skipping files on nested field filters.
I was intrigued by your comment on enabling stats on nested structures by
replacing `fileSchema.asStruct().field(fieldId)` with `
fileSchema.findField(fieldId)` in `ParquetMetrics$fromMetadata` .. Have you
had success with this? If so, I can try it out on our data as well.





On Tue, Feb 26, 2019 at 8:24 PM Anton Okolnychyi 
wrote:

> Hi Gautam,
>
> I believe you see this behaviour because SparkAppenderFactory is
> configured to use ParquetWriteAdapter. It only tracks the number of records
> and uses ParquetWriteSupport from Spark. This means that the statistics is
> not collected on writes and cannot be used on reads.
>
> Once [1] is merged, proper statistics will be fetched from the footer and
> persisted in the manifests. The statistics is collected when writing data
> files not manifests. See [2] for more info. Also, [3] contains an example
> that filters out files (it requires [1] to be cherry-picked locally).
>
> Hope that helps,
> Anton
>
> [1] - https://github.com/apache/incubator-iceberg/pull/63
> [2] -
> https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java
> [3] - https://github.com/apache/incubator-iceberg/pull/105
>
>
> On 26 Feb 2019, at 13:58, Gautam  wrote:
>
> .. Just to be clear my concern is around Iceberg not skipping files.
> Iceberg does skip rowGroups when scanning files as
> *iceberg.parquet.ParquetReader* uses the parquet stats under it while
> skipping, albeit none of these stats come from the manifests.
>
> On Tue, Feb 26, 2019 at 7:24 PM Gautam  wrote:
>
>> Hello Devs,
>>I am looking into leveraging Iceberg to speed up split
>> generation and to minimize file scans. My understanding was that Iceberg
>> keeps key statistics as listed under Metrics.java [1] viz. column
>> lower/upper bounds, nullValues, distinct value counts, etc. and that table
>> scanning leverages these to skip partitions, files & row-groups (in the
>> Parquet context).
>>
>> What I found is files aren't skipped when a predicate applies only to a
>> subset of the table's files. Within a partition it will scan all files as
>> manifests only keep record counts but the rest of the metrics (lower,
>> upper, distinct value counts, null values) are null / empty. This is coz
>> AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently
>> that is the only appender supported for writing manifest files.
>>
>>
>> *Example :*
>>
>> In following example iceTable was generated by iteratively adding two
>> files so it has two separate parquet files under it ..
>>
>> scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl))
>>
>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>> partition_data=PartitionData{}, residual=true}
>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
>> partition_data=PartitionData{}, residual=true}
>>
>>
>> *Only one file contains row with age = null .. *
>>
>> scala> iceDf.show()
>> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a task
>> of very large size (113 KB). The maximum recommended task size is 100 KB.
>> ++---++
>> | age|   name| friends|
>> ++---++
>> |  60| Kannan|  [Justin -> 19]|
>> |  75| Sharon|[Michael -> 30, J...|
>> |null|Michael|null|
>> |  30|   Andy|[Josh -> 10, Bisw...|
>> |  19| Justin|[Kannan -> 75, Sa...|
>> ++---++
>>
>>
>>
>> *Running filter on isNull(age) scans both files .. *
>>
>> val isNullExp = Expressions.isNull("age")
>> val isNullScan = iceTable.newScan().filter(isNullExp)
>>
>> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))
>>
>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
>> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-0-82ae5672-20bf-4e76-bf76-13062360

Re: Iceberg scans not keeping or using important file/column statistics in manifests ..

2019-02-26 Thread Gautam
.. Just to be clear my concern is around Iceberg not skipping files.
Iceberg does skip rowGroups when scanning files as
*iceberg.parquet.ParquetReader* uses the parquet stats under it while
skipping, albeit none of these stats come from the manifests.

On Tue, Feb 26, 2019 at 7:24 PM Gautam  wrote:

> Hello Devs,
>I am looking into leveraging Iceberg to speed up split
> generation and to minimize file scans. My understanding was that Iceberg
> keeps key statistics as listed under Metrics.java [1] viz. column
> lower/upper bounds, nullValues, distinct value counts, etc. and that table
> scanning leverages these to skip partitions, files & row-groups (in the
> Parquet context).
>
> What I found is files aren't skipped when a predicate applies only to a
> subset of the table's files. Within a partition it will scan all files as
> manifests only keep record counts but the rest of the metrics (lower,
> upper, distinct value counts, null values) are null / empty. This is coz
> AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently
> that is the only appender supported for writing manifest files.
>
>
> *Example :*
>
> In following example iceTable was generated by iteratively adding two
> files so it has two separate parquet files under it ..
>
> scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl))
>
> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
> partition_data=PartitionData{}, residual=true}
> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
> partition_data=PartitionData{}, residual=true}
>
>
> *Only one file contains row with age = null .. *
>
> scala> iceDf.show()
> 19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a task
> of very large size (113 KB). The maximum recommended task size is 100 KB.
> ++---++
> | age|   name| friends|
> ++---++
> |  60| Kannan|  [Justin -> 19]|
> |  75| Sharon|[Michael -> 30, J...|
> |null|Michael|null|
> |  30|   Andy|[Josh -> 10, Bisw...|
> |  19| Justin|[Kannan -> 75, Sa...|
> ++---++
>
>
>
> *Running filter on isNull(age) scans both files .. *
>
> val isNullExp = Expressions.isNull("age")
> val isNullScan = iceTable.newScan().filter(isNullExp)
>
> scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))
>
> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
> BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
> partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
>
>
>
> I would expect only one file to be scanned as Iceberg should track
> nullValueCounts as per Metrics.java [1] .. The same issue holds for integer
> comparison filters scanning too many files.
>
> When I looked through the code, there is provision for using Parquet file
> footer stats to populate Manifest Metrics [3] but this is never used as
> Iceberg currently only allows AvroFileAppender for creating manifest files.
>
> What's the plan around using Parquet footer stats in manifests which can
> be very useful during split generation? I saw some discussions around this
> in the Iceberg Spec document [4] but couldn't glean if any of those are
> actually implemented yet.
>
> I can work on a proposal PR for adding these in but wanted to  know the
> current thoughts around this.
>
>
> *Gist for above example *:
> https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7
>
>
> Looking forward to your feedback,
>
> Cheers,
> -Gautam.
>
>
>
>
>
> [1] -
> https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java
> [2] -
> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56
> [3] -
> https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118
> [4] -
> https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#
>
>


Iceberg scans not keeping or using important file/column statistics in manifests ..

2019-02-26 Thread Gautam
Hello Devs,
   I am looking into leveraging Iceberg to speed up split
generation and to minimize file scans. My understanding was that Iceberg
keeps key statistics as listed under Metrics.java [1] viz. column
lower/upper bounds, nullValues, distinct value counts, etc. and that table
scanning leverages these to skip partitions, files & row-groups (in the
Parquet context).

What I found is files aren't skipped when a predicate applies only to a
subset of the table's files. Within a partition it will scan all files as
manifests only keep record counts but the rest of the metrics (lower,
upper, distinct value counts, null values) are null / empty. This is coz
AvroFileAppender only keeps `recordCounts` as metrics [2].. And currently
that is the only appender supported for writing manifest files.


*Example :*

In following example iceTable was generated by iteratively adding two files
so it has two separate parquet files under it ..

scala> iceTable.newScan().planFiles.asScala.foreach(fl => println(fl))

BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
partition_data=PartitionData{}, residual=true}
BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
partition_data=PartitionData{}, residual=true}


*Only one file contains row with age = null .. *

scala> iceDf.show()
19/02/26 13:30:46 WARN scheduler.TaskSetManager: Stage 3 contains a task of
very large size (113 KB). The maximum recommended task size is 100 KB.
++---++
| age|   name| friends|
++---++
|  60| Kannan|  [Justin -> 19]|
|  75| Sharon|[Michael -> 30, J...|
|null|Michael|null|
|  30|   Andy|[Josh -> 10, Bisw...|
|  19| Justin|[Kannan -> 75, Sa...|
++---++



*Running filter on isNull(age) scans both files .. *

val isNullExp = Expressions.isNull("age")
val isNullScan = iceTable.newScan().filter(isNullExp)

scala> isNullScan.planFiles.asScala.foreach(fl => println(fl))

BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-1-8d8c9ecf-e1fa-4bdb-bdb4-1e9b5f4b71dc.parquet,
partition_data=PartitionData{}, residual=is_null(ref(name="age"))}
BaseFileScanTask{file=file:/usr/local/spark/test/iceberg-people/data/0-0-82ae5672-20bf-4e76-bf76-130623606a72.parquet,
partition_data=PartitionData{}, residual=is_null(ref(name="age"))}



I would expect only one file to be scanned as Iceberg should track
nullValueCounts as per Metrics.java [1] .. The same issue holds for integer
comparison filters scanning too many files.

When I looked through the code, there is provision for using Parquet file
footer stats to populate Manifest Metrics [3] but this is never used as
Iceberg currently only allows AvroFileAppender for creating manifest files.

What's the plan around using Parquet footer stats in manifests which can be
very useful during split generation? I saw some discussions around this in
the Iceberg Spec document [4] but couldn't glean if any of those are
actually implemented yet.

I can work on a proposal PR for adding these in but wanted to  know the
current thoughts around this.


*Gist for above example *:
https://gist.github.com/prodeezy/fe1b447c78c0bc9dc3be66272341d1a7


Looking forward to your feedback,

Cheers,
-Gautam.





[1] -
https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Metrics.java
[2] -
https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/com/netflix/iceberg/avro/AvroFileAppender.java#L56
[3] -
https://github.com/apache/incubator-iceberg/blob/1bec13a954c29f8cd09719a0362c0b2829635c77/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetWriter.java#L118
[4] -
https://docs.google.com/document/d/1Q-zL5lSCle6NEEdyfiYsXYzX_Q8Qf0ctMyGBKslOswA/edit#


Re: Iceberg fails to return results when filtered on complex columns ..

2019-02-21 Thread Gautam
Hey Ryan,

I found the root cause of the post scan filter not working over Iceberg
format.

*The short explanation: *Iceberg  Parquet reader fails to scan rows when
using complex column filter(s). Iceberg Parquet reader doesn't return any
rows for the post scan filter to further inspect with post scan filters.

*More Detail: *
Although complex filter isn't pushed down by Spark to Iceberg scan, it does
push down an implicit isNotNull(mapCol) filter. Before scanning begins row
groups are evaluated to check if they can be skipped *[1]*. While doing so
ParquetMetricsRowGroupFilter rejects rowgroups on evaluation of this
isNotNull(mapCol) filter. ParquetMetricsRowGroupFilter implements a
BoundExpressionVisitor wherein it's notNull() expression evaluation method
doesn't recognize complex types as being "present" *[2]*, hence leading the
reader to believe that column is not present and is all nulls.

*In the map filter case: *
The PMRGFilter keeps a `valueCounts` metric, which keeps a count statistic
by column id. This doesn't contain counts for map column but instead has
value counts for the map-keys and map-values ( which have different unique
ids). So a lookup for the map column id fails to return any counts.

Proposed fix options:
1 - Reject handling the implicit isNotNull(mapCol) check in Parquet Reader
for nested types as we know nested types are not pushed down.
2 - We can just skip the stats based check for Nested Types as we know they
need to be re-evaluated by post scan filters anyways.

Let me know what you think,

Cheers,
-Gautam.

[1] -
https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetReader.java#L103-L112
[2] -
https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/com/netflix/iceberg/parquet/ParquetMetricsRowGroupFilter.java#L159-L163


On Wed, Feb 20, 2019 at 12:25 AM Ryan Blue 
wrote:

> Hi Gautam,
>
> Thanks for reporting this. I'll look into why Spark is filtering out all
> of the Iceberg records. It should use the same filter in both cases, so I'm
> surprised that this is happening.
>
> The problem with the complex predicate is that it is inside a map. Parquet
> doesn't support push-down filters for nested columns.
>
> On Mon, Feb 18, 2019 at 11:03 PM Gautam  wrote:
>
>> Hello Iceberg Devs,
>>I'v been tracking an issue with predicate pushdowns in
>> Iceberg on complex types. I have compared vanilla Spark reader over Parquet
>> vs. Iceberg format reader.  I have an example detailing it here:
>> https://github.com/apache/incubator-iceberg/issues/99
>>
>> *Vanilla Spark Parquet reader plan*
>> == Physical Plan ==
>> *(1) Project [age#428, name#429, friends#430, location#431]
>> +- *(1) Filter (isnotnull(friends#430) && (friends#430[Josh] = 10))
>>+- *(1) FileScan parquet [age#428,name#429,friends#430,location#431]
>> Batched: false, Format: Parquet, Location:
>> InMemoryFileIndex[file:/usr/local/spark/test/parquet-people-complex],
>> PartitionFilters: [], PushedFilters: [IsNotNull(friends)], ReadSchema:
>> struct,location:struct>
>>
>> * Iceberg Plan*
>> == Physical Plan ==
>> *(1) Project [age#33, name#34, friends#35]
>> +- *(1) Filter ((friends#35[Josh] = 10) && isnotnull(friends#35))
>>+- *(1) ScanV2 iceberg[age#33, name#34, friends#35] (Filters:
>> [isnotnull(friends#35)], Options: [path=iceberg-people-complex2,paths=[]])
>>
>>
>> *Couple of points :*
>> 1)  Complex predicate is not passed down to the Scan level in both plans.
>> The complex predicate is termed "non-translateable" by
>> *DataSourceStrategy.translateFilter()  *[1] when trying to convert
>> Catalyst expression to data source filter. Ryan & Xabriel had a discussion
>> earlier on this list about Spark not passing expressions to data source (in
>> certain cases). This might be related to that. Maybe a path forward is to
>> fix that translation in Spark so that Iceberg Filter conversion has a
>> chance to handle complex type. Currently Iceberg Reader code is unaware of
>> that filter.
>>
>> 2) Although both vanilla Spark and Iceberg handle complex type predicates
>> post scan, this regression is caused by post scan filtering not returning
>> results in the Iceberg case. I think post scan filtering is unable to
>> handle Iceberg format. So if 1) is not the way forward then the alternative
>> way is to fix this in the post scan filtering.
>>
>>
>> Looking forward to your guidance on the way forward.
>>
>> Cheers,
>> -Gautam.
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L450
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Iceberg fails to return results when filtered on complex columns ..

2019-02-18 Thread Gautam
Hello Iceberg Devs,
   I'v been tracking an issue with predicate pushdowns in
Iceberg on complex types. I have compared vanilla Spark reader over Parquet
vs. Iceberg format reader.  I have an example detailing it here:
https://github.com/apache/incubator-iceberg/issues/99

*Vanilla Spark Parquet reader plan*
== Physical Plan ==
*(1) Project [age#428, name#429, friends#430, location#431]
+- *(1) Filter (isnotnull(friends#430) && (friends#430[Josh] = 10))
   +- *(1) FileScan parquet [age#428,name#429,friends#430,location#431]
Batched: false, Format: Parquet, Location:
InMemoryFileIndex[file:/usr/local/spark/test/parquet-people-complex],
PartitionFilters: [], PushedFilters: [IsNotNull(friends)], ReadSchema:
struct,location:struct>

* Iceberg Plan*
== Physical Plan ==
*(1) Project [age#33, name#34, friends#35]
+- *(1) Filter ((friends#35[Josh] = 10) && isnotnull(friends#35))
   +- *(1) ScanV2 iceberg[age#33, name#34, friends#35] (Filters:
[isnotnull(friends#35)], Options: [path=iceberg-people-complex2,paths=[]])


*Couple of points :*
1)  Complex predicate is not passed down to the Scan level in both plans.
The complex predicate is termed "non-translateable" by
*DataSourceStrategy.translateFilter()  *[1] when trying to convert Catalyst
expression to data source filter. Ryan & Xabriel had a discussion earlier
on this list about Spark not passing expressions to data source (in certain
cases). This might be related to that. Maybe a path forward is to fix that
translation in Spark so that Iceberg Filter conversion has a chance to
handle complex type. Currently Iceberg Reader code is unaware of that
filter.

2) Although both vanilla Spark and Iceberg handle complex type predicates
post scan, this regression is caused by post scan filtering not returning
results in the Iceberg case. I think post scan filtering is unable to
handle Iceberg format. So if 1) is not the way forward then the alternative
way is to fix this in the post scan filtering.


Looking forward to your guidance on the way forward.

Cheers,
-Gautam.

[1] -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L450