Thanks Peter for forwarding this.
Best Regards
Ahmed Hamdy

On Tue, 9 Apr 2024 at 05:51, Péter Váry <peter.vary.apa...@gmail.com> wrote:

> Forwarding the invite for the discussion we plan to do with the Iceberg
> folks, as some of you might be interested in this.
>
> ---------- Forwarded message ---------
> From: Brian Olsen <bitsondata...@gmail.com>
> Date: Mon, Apr 8, 2024, 18:29
> Subject: Re: Flink table maintenance
> To: <d...@iceberg.apache.org>
>
>
> Hey Iceberg nation,
>
> I would like to share about the meeting this Wednesday to further discuss
> details of Péter's proposal on Flink Maintenance Tasks.
> Calendar Link: https://calendar.app.google/83HGYWXoQJ8zXuVCA
>
> List discussion:
> https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl
> <
> https://www.google.com/url?q=https://lists.apache.org/thread/10mdf9zo6pn0dfq791nf4w1m7jh9k3sl&sa=D&source=calendar&usd=2&usg=AOvVaw2-aePIRr6APFVHpRDipMgX
> >
>
> Design Doc: Flink table maintenance
> <
> https://www.google.com/url?q=https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp%3Dsharing&sa=D&source=calendar&usd=2&usg=AOvVaw1oLYQP76-G1ZEOW5pTxV1M
> >
>
>
>
> On Mon, Apr 1, 2024 at 8:52 PM Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> > Hi Peter,
> >
> > Are you proposing to create a user facing locking feature in Iceberg, or
> >> just something something for internal use?
> >>
> >
> > Since it's a general issue, I'm proposing to create a general user
> > interface first, while the implementation can be left to users. For
> > example, we use Airflow to schedule maintenance jobs and we can check
> > in-progress jobs with the Airflow API. Hive metastore lock might be
> another
> > option we can implement for users.
> >
> > Thanks,
> > Manu
> >
> > On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com>
> > wrote:
> >
> >> Hi Ajantha,
> >>
> >> I thought about enabling post commit topology based compaction for sinks
> >> using options, like we use for the parametrization of streaming reads
> [1].
> >> I think it will be hard to do it in a user friendly way - because of the
> >> high number of parameters -, but I think it is a possible solution with
> >> sensible defaults.
> >>
> >> There is a batch-like solution for data file compaction already
> available
> >> [2], but I do not see how we could extend Flink SQL to be able to call
> it.
> >>
> >> Writing to a branch using Flink SQL should be another thread, but by my
> >> first guess, it shouldn't be hard to implement using options, like:
> >> /*+ OPTIONS('branch'='b1') */
> >> Since writing to branch i already working through the Java API [3].
> >>
> >> Thanks, Peter
> >>
> >> 1 -
> >>
> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read
> >> 2 -
> >>
> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
> >> 3 -
> >> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes
> >>
> >> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote:
> >>
> >>> Thanks for the proposal Peter.
> >>>
> >>> I just wanted to know do we have any plans for supporting SQL syntax
> for
> >>> table maintenance (like CALL procedure) for pure Flink SQL users?
> >>> I didn't see any custom SQL parser plugin support in Flink. I also saw
> >>> that Branch write doesn't have SQL support (only Branch reads use
> Option),
> >>> So I am not sure about the roadmap of Iceberg SQL support in Flink.
> >>> Was there any discussion before?
> >>>
> >>> - Ajantha
> >>>
> >>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Hi Manu,
> >>>>
> >>>> Just to clarify:
> >>>> - Are you proposing to create a user facing locking feature in
> Iceberg,
> >>>> or just something something for internal use?
> >>>>
> >>>> I think we shouldn't add locking to Iceberg's user facing scope in
> this
> >>>> stage. A fully featured locking system has many more features that we
> need
> >>>> (priorities, fairness, timeouts etc). I could be tempted when we are
> >>>> talking about the REST catalog, but I think that should be further
> down the
> >>>> road, if ever...
> >>>>
> >>>> About using the tags:
> >>>> - I whole-heartedly agree that using tags is not intuitive, and I see
> >>>> your points in most of your arguments. OTOH, introducing new
> requirement
> >>>> (locking mechanism) seems like a wrong direction to me.
> >>>> - We already defined a requirement (atomic changes on the table) for
> >>>> the Catalog implementations which could be used to archive our goal
> here.
> >>>> - We also already store technical data in snapshot properties in Flink
> >>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table
> >>>> properties is not a big stretch.
> >>>>
> >>>> Or we can look at these tags or metadata as 'technical data' which is
> >>>> internal to Iceberg, and shouldn't expressed on the external API. My
> >>>> concern is:
> >>>> - Would it be used often enough to worth the additional complexity?
> >>>>
> >>>> Knowing that Spark compaction is struggling with the same issue is a
> >>>> good indicator, but probably we would need more use cases for
> introducing a
> >>>> new feature with this complexity, or simpler solution.
> >>>>
> >>>> Thanks, Peter
> >>>>
> >>>>
> >>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com>
> wrote:
> >>>>
> >>>>> What would the community think of exploiting tags for preventing
> >>>>>> concurrent maintenance loop executions.
> >>>>>
> >>>>>
> >>>>> This issue is not specific to Flink maintenance jobs. We have a
> >>>>> service scheduling Spark maintenance jobs by watching table commits.
> When
> >>>>> we don't check in-progress maintenance jobs for the same table,
> multiple
> >>>>> jobs will run concurrently and have conflicts.
> >>>>>
> >>>>> Basically, I think we need a lock mechanism like the metastore lock
> >>>>> <
> https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration
> >
> >>>>> if we want to handle it for users. However, using TAG doesn't look
> >>>>> intuitive to me. We are also mixing user data with system metadata.
> >>>>> Maybe we can define some general interfaces and leave the
> >>>>> implementation to users in the first version.
> >>>>>
> >>>>> Regards,
> >>>>> Manu
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry <
> >>>>> peter.vary.apa...@gmail.com> wrote:
> >>>>>
> >>>>>> What would the community think of exploiting tags for preventing
> >>>>>> concurrent maintenance loop executions.
> >>>>>>
> >>>>>> The issue:
> >>>>>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles
> >>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles.
> We make
> >>>>>> sure, not to run tasks started by a single trigger concurrently by
> >>>>>> serializing them, but there are no loops in Flink, so we can't
> synchronize
> >>>>>> tasks started by the next trigger.
> >>>>>>
> >>>>>> In the document, I describe that we need to rely on the user to
> >>>>>> ensure that the rate limit is high enough to prevent concurrent
> triggers.
> >>>>>>
> >>>>>> Proposal:
> >>>>>> When firing a trigger, RateLimiter could check and create an Iceberg
> >>>>>> table tag [1] for the current table snapshot, with the name:
> >>>>>> '__flink_maitenance'. When the execution finishes we remove this
> tag. The
> >>>>>> RateLimiter keep accumulating changes, and doesn't fire new
> triggers until
> >>>>>> it finds this tag on the table.
> >>>>>> The solution relies on Flink 'forceNonParallel' to prevent
> concurrent
> >>>>>> execution of placing the tag, and on Iceberg to store it.
> >>>>>>
> >>>>>> This not uses the tags as intended, but seems like a better solution
> >>>>>> than adding/removing table properties which would clutter the table
> history
> >>>>>> with technical data.
> >>>>>>
> >>>>>> Your thoughts? Any other suggestions/solutions would be welcome.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Peter
> >>>>>>
> >>>>>> [1]
> >>>>>>
> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging
> >>>>>>
> >>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com
> >
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Team,
> >>>>>>>
> >>>>>>> As discussed on yesterday's community sync, I am working on adding
> a
> >>>>>>> possibility to the Flink Iceberg connector to run maintenance
> tasks on the
> >>>>>>> Iceberg tables. This will fix the small files issues and in the
> long run
> >>>>>>> help compacting the high number of positional and equality deletes
> created
> >>>>>>> by Flink tasks writing CDC data to Iceberg tables without the need
> of Spark
> >>>>>>> in the infrastructure.
> >>>>>>>
> >>>>>>> I did some planning, prototyping and currently trying out the
> >>>>>>> solution on a larger scale.
> >>>>>>>
> >>>>>>> I put together a document how my current solution looks like:
> >>>>>>>
> >>>>>>>
> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing
> >>>>>>>
> >>>>>>> I would love to hear your thoughts and feedback on this to find a
> >>>>>>> good final solution.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Peter
> >>>>>>>
> >>>>>>
>

Reply via email to