Thanks Peter for forwarding this. Best Regards Ahmed Hamdy
On Tue, 9 Apr 2024 at 05:51, Péter Váry <peter.vary.apa...@gmail.com> wrote: > Forwarding the invite for the discussion we plan to do with the Iceberg > folks, as some of you might be interested in this. > > ---------- Forwarded message --------- > From: Brian Olsen <bitsondata...@gmail.com> > Date: Mon, Apr 8, 2024, 18:29 > Subject: Re: Flink table maintenance > To: <d...@iceberg.apache.org> > > > Hey Iceberg nation, > > I would like to share about the meeting this Wednesday to further discuss > details of Péter's proposal on Flink Maintenance Tasks. > Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA > > List discussion: > https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl > < > https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX > > > > Design Doc: Flink table maintenance > < > https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M > > > > > > On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <owenzhang1...@gmail.com> wrote: > > > Hi Peter, > > > > Are you proposing to create a user facing locking feature in Iceberg, or > >> just something something for internal use? > >> > > > > Since it's a general issue, I'm proposing to create a general user > > interface first, while the implementation can be left to users. For > > example, we use Airflow to schedule maintenance jobs and we can check > > in-progress jobs with the Airflow API. Hive metastore lock might be > another > > option we can implement for users. > > > > Thanks, > > Manu > > > > On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com> > > wrote: > > > >> Hi Ajantha, > >> > >> I thought about enabling post commit topology based compaction for sinks > >> using options, like we use for the parametrization of streaming reads > [1]. > >> I think it will be hard to do it in a user friendly way - because of the > >> high number of parameters -, but I think it is a possible solution with > >> sensible defaults. > >> > >> There is a batch-like solution for data file compaction already > available > >> [2], but I do not see how we could extend Flink SQL to be able to call > it. > >> > >> Writing to a branch using Flink SQL should be another thread, but by my > >> first guess, it shouldn't be hard to implement using options, like: > >> /*+ OPTIONS('branch'='b1') */ > >> Since writing to branch i already working through the Java API [3]. > >> > >> Thanks, Peter > >> > >> 1 - > >> > https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read > >> 2 - > >> > https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java > >> 3 - > >> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes > >> > >> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote: > >> > >>> Thanks for the proposal Peter. > >>> > >>> I just wanted to know do we have any plans for supporting SQL syntax > for > >>> table maintenance (like CALL procedure) for pure Flink SQL users? > >>> I didn't see any custom SQL parser plugin support in Flink. I also saw > >>> that Branch write doesn't have SQL support (only Branch reads use > Option), > >>> So I am not sure about the roadmap of Iceberg SQL support in Flink. > >>> Was there any discussion before? > >>> > >>> - Ajantha > >>> > >>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com > > > >>> wrote: > >>> > >>>> Hi Manu, > >>>> > >>>> Just to clarify: > >>>> - Are you proposing to create a user facing locking feature in > Iceberg, > >>>> or just something something for internal use? > >>>> > >>>> I think we shouldn't add locking to Iceberg's user facing scope in > this > >>>> stage. A fully featured locking system has many more features that we > need > >>>> (priorities, fairness, timeouts etc). I could be tempted when we are > >>>> talking about the REST catalog, but I think that should be further > down the > >>>> road, if ever... > >>>> > >>>> About using the tags: > >>>> - I whole-heartedly agree that using tags is not intuitive, and I see > >>>> your points in most of your arguments. OTOH, introducing new > requirement > >>>> (locking mechanism) seems like a wrong direction to me. > >>>> - We already defined a requirement (atomic changes on the table) for > >>>> the Catalog implementations which could be used to archive our goal > here. > >>>> - We also already store technical data in snapshot properties in Flink > >>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table > >>>> properties is not a big stretch. > >>>> > >>>> Or we can look at these tags or metadata as 'technical data' which is > >>>> internal to Iceberg, and shouldn't expressed on the external API. My > >>>> concern is: > >>>> - Would it be used often enough to worth the additional complexity? > >>>> > >>>> Knowing that Spark compaction is struggling with the same issue is a > >>>> good indicator, but probably we would need more use cases for > introducing a > >>>> new feature with this complexity, or simpler solution. > >>>> > >>>> Thanks, Peter > >>>> > >>>> > >>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> > wrote: > >>>> > >>>>> What would the community think of exploiting tags for preventing > >>>>>> concurrent maintenance loop executions. > >>>>> > >>>>> > >>>>> This issue is not specific to Flink maintenance jobs. We have a > >>>>> service scheduling Spark maintenance jobs by watching table commits. > When > >>>>> we don't check in-progress maintenance jobs for the same table, > multiple > >>>>> jobs will run concurrently and have conflicts. > >>>>> > >>>>> Basically, I think we need a lock mechanism like the metastore lock > >>>>> < > https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration > > > >>>>> if we want to handle it for users. However, using TAG doesn't look > >>>>> intuitive to me. We are also mixing user data with system metadata. > >>>>> Maybe we can define some general interfaces and leave the > >>>>> implementation to users in the first version. > >>>>> > >>>>> Regards, > >>>>> Manu > >>>>> > >>>>> > >>>>> > >>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry < > >>>>> peter.vary.apa...@gmail.com> wrote: > >>>>> > >>>>>> What would the community think of exploiting tags for preventing > >>>>>> concurrent maintenance loop executions. > >>>>>> > >>>>>> The issue: > >>>>>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles > >>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. > We make > >>>>>> sure, not to run tasks started by a single trigger concurrently by > >>>>>> serializing them, but there are no loops in Flink, so we can't > synchronize > >>>>>> tasks started by the next trigger. > >>>>>> > >>>>>> In the document, I describe that we need to rely on the user to > >>>>>> ensure that the rate limit is high enough to prevent concurrent > triggers. > >>>>>> > >>>>>> Proposal: > >>>>>> When firing a trigger, RateLimiter could check and create an Iceberg > >>>>>> table tag [1] for the current table snapshot, with the name: > >>>>>> '__flink_maitenance'. When the execution finishes we remove this > tag. The > >>>>>> RateLimiter keep accumulating changes, and doesn't fire new > triggers until > >>>>>> it finds this tag on the table. > >>>>>> The solution relies on Flink 'forceNonParallel' to prevent > concurrent > >>>>>> execution of placing the tag, and on Iceberg to store it. > >>>>>> > >>>>>> This not uses the tags as intended, but seems like a better solution > >>>>>> than adding/removing table properties which would clutter the table > history > >>>>>> with technical data. > >>>>>> > >>>>>> Your thoughts? Any other suggestions/solutions would be welcome. > >>>>>> > >>>>>> Thanks, > >>>>>> Peter > >>>>>> > >>>>>> [1] > >>>>>> > https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging > >>>>>> > >>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com > > > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Team, > >>>>>>> > >>>>>>> As discussed on yesterday's community sync, I am working on adding > a > >>>>>>> possibility to the Flink Iceberg connector to run maintenance > tasks on the > >>>>>>> Iceberg tables. This will fix the small files issues and in the > long run > >>>>>>> help compacting the high number of positional and equality deletes > created > >>>>>>> by Flink tasks writing CDC data to Iceberg tables without the need > of Spark > >>>>>>> in the infrastructure. > >>>>>>> > >>>>>>> I did some planning, prototyping and currently trying out the > >>>>>>> solution on a larger scale. > >>>>>>> > >>>>>>> I put together a document how my current solution looks like: > >>>>>>> > >>>>>>> > https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing > >>>>>>> > >>>>>>> I would love to hear your thoughts and feedback on this to find a > >>>>>>> good final solution. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Peter > >>>>>>> > >>>>>> >