Let me start an initial discussion thread at dev@flink. Like to gauge the
interests from the community (including Hudi and Delta Lake) first before
spending time on writing up a big FLIP.

On Fri, Jan 27, 2023 at 10:45 PM Jark Wu <imj...@gmail.com> wrote:

> Thank Steven for the explanation.
>
> It sounds good to me to implement the shuffle operator in the Iceberg
> project first.
> We can contribute it to Flink DataStream in the future if other
> projects/connectors also need it.
>
> Best,
> Jark
>
>
> On Wed, 18 Jan 2023 at 02:11, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Jark,
>>
>> We were planning to discard the proposal due to some valid concerns
>> raised in the thread. Also, this proposal itself didn't really save too
>> much code duplication (maybe 100 lines or so).
>>
>> I also thought that the shuffle operator for DataStream can be useful for
>> other connectors too. The shuffling part (based on traffic statistics) can
>> be generic for other connectors. There will be some small integration part
>> unique to Iceberg, which can stay in Iceberg. If we go with this new
>> direction, we would need a new FLIP.
>>
>> Thanks,
>> Steven
>>
>>
>>
>> On Mon, Jan 16, 2023 at 12:30 AM Jark Wu <imj...@gmail.com> wrote:
>>
>>> What's the status and conclusion of this discussion?
>>>
>>> I have seen the value of exposing OperatorCoordinator because of the
>>> powerful RPC calls,
>>> some projects are already using it, such as Hudi[1]. But I agree this is
>>> a large topic and
>>> requires another FLIP.
>>>
>>> I am also concerned about extracting a Public base class without
>>> implementations, and
>>> clear usage is easy to break in the future. However, I think the
>>> shuffling operator can be a
>>> generic component used by other connectors and DataStream jobs.
>>>
>>> Have you considered contributing the ShuffleOperator to the Flink main
>>> repository as a
>>> part of DataStream API (e.g., DataStream#dynamicShuffle)? It's easy to
>>> extract the common
>>> part between SourceCoordinatorContext and ShuffleCoordinatorContext in a
>>> single repository
>>>  as an internal implementation.
>>>
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://github.com/apache/hudi/blob/a80bb4f717ad8a89770176a1238c4b08874044e8/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java
>>>
>>> On Thu, 3 Nov 2022 at 22:36, Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Ohhh, I was confused. I thought that the proposal is to make
>>>> `CoordinatorContextBase` part of the public API.
>>>>
>>>> However, I'm also against extracting `CoordinatorContextBase` as an
>>>> `@Internal` class as well.
>>>>
>>>> 1. Connectors shouldn't reuse internal classes. Using `@Internal`
>>>> CoordinatedOperatorFactory would be already quite bad, but at least
>>>> this is
>>>> a relatively stable internal API. Using `@Internal`
>>>> `@CoordinatorContextBase`, and refactoring out this base class just for
>>>> the
>>>> sake of re-using it in a connector is IMO even worse.
>>>> 2. Double so if they are in a separate repository (as the iceberg
>>>> connector
>>>> will be/is, right?). There would be no way to prevent breaking changes
>>>> between repositories.
>>>>
>>>> If that's only intended as the stop-gap solution until we properly
>>>> expose
>>>> coordinators, the lesser evil would be IMO to copy/paste/modify
>>>> SourceCoordinatorContext to the flink-connector-iceberg repository.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> czw., 3 lis 2022 o 12:51 Maximilian Michels <m...@apache.org>
>>>> napisał(a):
>>>>
>>>> > +1 If we wanted to expose the OperatorCoordinator API, we should
>>>> provide
>>>> > an adequate interface. The FLIP partially addresses this by trying to
>>>> > factor out RPC code which other coordinators might make use of, but
>>>> there
>>>> > is additional design necessary to realize a public operator API.
>>>> >
>>>> > Just to be clear, I'm not opposed to any of the changes in the FLIP. I
>>>> > think they make sense in the context of an Iceberg ShuffleCoordinator
>>>> in
>>>> > Flink. If we were to add such a new coordinator, feel free to make the
>>>> > proposed code refactoring as part of a pull request. A FLIP isn't
>>>> strictly
>>>> > necessary here because this is a purely internal change which does not
>>>> > alter public APIs, nor does it alter the internal architecture, apart
>>>> from
>>>> > reusing a bit of existing code. I'm sorry if we consumed some of your
>>>> time
>>>> > revising the document but I think we had a healthy discussion here.
>>>> And
>>>> > we're definitely looking forward to seeing some of these code changes!
>>>> >
>>>> > -Max
>>>> >
>>>> > On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pnowoj...@apache.org>
>>>> > wrote:
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >> Sorry for the delay, but I've given more thoughts into this. First I
>>>> >> share the same thoughts as Maximilian, that this FLIP is incomplete.
>>>> As I
>>>> >> understand it, you are trying to hack existing code to expose small
>>>> bits of
>>>> >> internal functionalities as part of the public API without solving
>>>> many of
>>>> >> the underlying issues.
>>>> >>
>>>> >> For example, what's the point of exposing `CoordinatorContextBase`
>>>> as a
>>>> >> public API if users can not use it? After all, the
>>>> `OperatorCoordinator`
>>>> >> and `CoordinatedOperatorFactory` would remain internal. At the same
>>>> time,
>>>> >> this FLIP would officially force us to support and maintain this
>>>> >> CoordinatorContextBase, while I have strong feelings that we don't
>>>> want to
>>>> >> do that in the long term. I think we would need to take a big step
>>>> back and
>>>> >> first discuss how we would like to expose the coordinators and agree
>>>> how to
>>>> >> deal with the issues.
>>>> >>
>>>> >> First big issue that I see is that I would feel very worried exposing
>>>> >> coordinator API without at least designing/planning how to deal with
>>>> >> checkpointing their state. Without that, I'm afraid we might end up
>>>> in a
>>>> >> situation where we need to break the API in order to properly support
>>>> >> stateful coordinators. And at the moment I don't see a good and easy
>>>> >> solution to this problem.
>>>> >>
>>>> >> Second issue is the shape of the exposed public API. Exposing
>>>> >> `OperatorCoordinator` or  `CoordinatorContextBase` looks to me like
>>>> a bad
>>>> >> design, that would expose way too many things to the users, making
>>>> future
>>>> >> development more complicated for us and making implementation of
>>>> those
>>>> >> interfaces by the user unnecessary difficult. I see this as a
>>>> similar issue
>>>> >> as the low level `StreamOperator` API vs the higher level
>>>> >> `org.apache.flink.api.common.functions.Function` API. (instead of
>>>> exposing
>>>> >> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up
>>>> the
>>>> >> `ProcessFunction` to expose all of the remaining functionalities in a
>>>> >> user-friendly way). In the context of the coordinators, I would say
>>>> that we
>>>> >> should expose as the public API not the `OperatorCoordinator`, but
>>>> for
>>>> >> example some kind of an `EventProcessFunction` that would have a
>>>> simple
>>>> >> interface like:
>>>> >> ```
>>>> >> interface EventProcessFunction {
>>>> >>   void processEvent(int subtask, OperatorEvent event, EventDispatcher
>>>> >> eventDispatcher);
>>>> >> }
>>>> >> ```
>>>> >> + maybe some features like processing time timers/mailbox style async
>>>> >> actions.
>>>> >> (or maybe that could have been just a regular `ProcessFunction` but
>>>> with
>>>> >> `OperatorEvent` with `int subtask` as input/output).
>>>> >>
>>>> >> Best,
>>>> >> Piotrek
>>>> >>
>>>> >> śr., 2 lis 2022 o 19:40 gang ye <yegang...@gmail.com> napisał(a):
>>>> >>
>>>> >>> Hi Max and Qingsheng,
>>>> >>>
>>>> >>> Thanks for the feedback. The initial motivation to propose this is
>>>> to
>>>> >>> reduce the duplicated code since ShuffleCoordinator would need
>>>> similar
>>>> >>> communication logic as SourceCoordinator to talk with operators. I
>>>> >>> understand your concern that OperatorCoordinator is an internal
>>>> class and
>>>> >>> except SourceCoordinator for now no other uses this.
>>>> >>> How about let's do it like what Qingsheng said? I can go ahead with
>>>> the
>>>> >>> ShufflingCoordinator implementation without the extraction. Then we
>>>> have
>>>> >>> intuitive sense of how many codes are copied and can be reused. If
>>>> we feel
>>>> >>> that there is still a need to extract, we can revisit the
>>>> discussion.
>>>> >>>
>>>> >>> Thanks
>>>> >>> Gang
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org>
>>>> wrote:
>>>> >>>
>>>> >>>> Thanks Gang and Steven for the FLIP. Actually I share the same
>>>> concern
>>>> >>>> with Piotr and Maximilian.
>>>> >>>>
>>>> >>>> OperatorCoordinator is marked as @Internal intentionally
>>>> considering
>>>> >>>> some existing issues, like consistency between non-source operator
>>>> and
>>>> >>>> coordinator on checkpoint. I'm wondering if it is useful to expose
>>>> a public
>>>> >>>> context to developers but have the OperatorCoordinator as an
>>>> internal API.
>>>> >>>> If we finally close all issues and decide to expose the operator
>>>> >>>> coordinator API, it would be a better chance to include the base
>>>> context as
>>>> >>>> a part of it.
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Qingsheng
>>>> >>>>
>>>> >>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <m...@apache.org>
>>>> >>>> wrote:
>>>> >>>>
>>>> >>>>> Thanks Steven! My confusion stemmed from the lack of context in
>>>> the
>>>> >>>>> FLIP.
>>>> >>>>> The first version did not lay out how the refactoring would be
>>>> used
>>>> >>>>> down
>>>> >>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator
>>>> API
>>>> >>>>> is a
>>>> >>>>> non-public API and before reading the code, I wasn't even aware
>>>> how
>>>> >>>>> exactly
>>>> >>>>> it worked and whether it would be available to regular operators
>>>> (it
>>>> >>>>> was
>>>> >>>>> originally intended for sources only).
>>>> >>>>>
>>>> >>>>> I might seem pedantic here but I believe the purpose of a FLIP
>>>> should
>>>> >>>>> be to
>>>> >>>>> describe the *why* behind the changes, not only the changes
>>>> itself. A
>>>> >>>>> FLIP
>>>> >>>>> is not a formality but is a tool to communicate and discuss
>>>> changes. I
>>>> >>>>> think we still haven't laid out the exact reasons why we are
>>>> factoring
>>>> >>>>> out
>>>> >>>>> the base. As far as I understand now, we need the base class to
>>>> deal
>>>> >>>>> with
>>>> >>>>> concurrent updates in the custom Coordinator from the runtime
>>>> >>>>> (sub)tasks.
>>>> >>>>> Effectively, we are enforcing an actor model for the processing
>>>> of the
>>>> >>>>> incoming messages such that the OperatorCoordinator can cleanly
>>>> update
>>>> >>>>> its
>>>> >>>>> state. However, if there are no actual implementations that make
>>>> use
>>>> >>>>> of the
>>>> >>>>> refactoring in Flink itself, I wonder if it would make sense to
>>>> copy
>>>> >>>>> this
>>>> >>>>> code to the downstream implementation, e.g. the
>>>> ShuffleCoordinator. As
>>>> >>>>> soon
>>>> >>>>> as it is part of Flink, we could of course try to consolidate this
>>>> >>>>> code.
>>>> >>>>>
>>>> >>>>> Considering the *how* of this, there appear to be both methods
>>>> from
>>>> >>>>> SourceCoordinator (e.g. runInEventLoop) as well as
>>>> >>>>> SourceCoordinatorContext
>>>> >>>>> listed in the FLIP, as well as methods which do not appear
>>>> anywhere in
>>>> >>>>> Flink code, e.g. subTaskReady / subTaskNotReady /
>>>> sendEventToOperator.
>>>> >>>>> It
>>>> >>>>> appears that some of this has been extracted from a downstream
>>>> >>>>> implementation. It would be great to adjust this, such that it
>>>> >>>>> reflects the
>>>> >>>>> status quo in Flink.
>>>> >>>>>
>>>> >>>>> -Max
>>>> >>>>>
>>>> >>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <stevenz...@gmail.com>
>>>> >>>>> wrote:
>>>> >>>>>
>>>> >>>>> > Max,
>>>> >>>>> >
>>>> >>>>> > Thanks a lot for the comments. We should clarify that the
>>>> shuffle
>>>> >>>>> > operator/coordinator is not really part of the Flink sink
>>>> >>>>> > function/operator. shuffle operator is a custom operator that
>>>> can be
>>>> >>>>> > inserted right before the Iceberg writer operator. Shuffle
>>>> operator
>>>> >>>>> > calculates the traffic statistics and performs a custom
>>>> >>>>> partition/shuffle
>>>> >>>>> > (DataStream#partitionCustom) to cluster the data right before
>>>> they
>>>> >>>>> get to
>>>> >>>>> > the Iceberg writer operator.
>>>> >>>>> >
>>>> >>>>> > We are not proposing to introduce a sink coordinator for the
>>>> sink
>>>> >>>>> > interface. Shuffle operator needs the CoordinatorContextBase to
>>>> >>>>> > facilitate the communication btw shuffle subtasks and
>>>> coordinator for
>>>> >>>>> > traffic statistics aggregation. The communication part is
>>>> already
>>>> >>>>> > implemented by SourceCoordinatorContext.
>>>> >>>>> >
>>>> >>>>> > Here are some details about the communication needs.
>>>> >>>>> > - subtasks periodically calculate local statistics and send to
>>>> the
>>>> >>>>> > coordinator for global aggregation
>>>> >>>>> > - the coordinator sends the globally aggregated statistics to
>>>> the
>>>> >>>>> subtasks
>>>> >>>>> > - subtasks use the globally aggregated statistics to guide the
>>>> >>>>> > partition/shuffle decision
>>>> >>>>> >
>>>> >>>>> > Regards,
>>>> >>>>> > Steven
>>>> >>>>> >
>>>> >>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <
>>>> m...@apache.org>
>>>> >>>>> wrote:
>>>> >>>>> >
>>>> >>>>> > > Hi Gang,
>>>> >>>>> > >
>>>> >>>>> > > Looks much better! I've actually gone through the
>>>> >>>>> OperatorCoordinator
>>>> >>>>> > code.
>>>> >>>>> > > It turns out, any operator already has an OperatorCoordinator
>>>> >>>>> assigned.
>>>> >>>>> > > Also, any operator can add custom coordinator code. So it
>>>> looks
>>>> >>>>> like you
>>>> >>>>> > > won't have to implement any additional runtime logic to add a
>>>> >>>>> > > ShuffleCoordinator. However, I'm wondering, why do you
>>>> >>>>> specifically need
>>>> >>>>> > to
>>>> >>>>> > > refactor the SourceCoordinatorContext? You could simply add
>>>> your
>>>> >>>>> own
>>>> >>>>> > > coordinator code. I'm not sure the sink requirements map to
>>>> the
>>>> >>>>> source
>>>> >>>>> > > interface so closely that you can reuse the same logic.
>>>> >>>>> > >
>>>> >>>>> > > If you can refactor SourceCoordinatorContext in a way that
>>>> makes
>>>> >>>>> it fit
>>>> >>>>> > > your use case, I have nothing to object here. By the way,
>>>> another
>>>> >>>>> example
>>>> >>>>> > > of an existing OperatorCoordinator is
>>>> >>>>> CollectSinkOperatorCoordinator
>>>> >>>>> > which
>>>> >>>>> > > is quite trivial but it might be worth evaluating whether you
>>>> need
>>>> >>>>> the
>>>> >>>>> > full
>>>> >>>>> > > power of SourceCoordinatorContext which is why I wanted to
>>>> get more
>>>> >>>>> > > context.
>>>> >>>>> > >
>>>> >>>>> > > -Max
>>>> >>>>> > >
>>>> >>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <yegang...@gmail.com>
>>>> >>>>> wrote:
>>>> >>>>> > >
>>>> >>>>> > > > Hi Max,
>>>> >>>>> > > > I got your concern. Since shuffling support for Flink
>>>> Iceberg
>>>> >>>>> sink is
>>>> >>>>> > not
>>>> >>>>> > > > the main body of the proposal, I add another appendix part
>>>> just
>>>> >>>>> now
>>>> >>>>> > with
>>>> >>>>> > > > more details about how to use CoordinatorContextBase and
>>>> how to
>>>> >>>>> define
>>>> >>>>> > > > ShufflingCoordinator.
>>>> >>>>> > > >
>>>> >>>>> > > > Let me know if that cannot solve your concern.
>>>> >>>>> > > >
>>>> >>>>> > > > Thanks
>>>> >>>>> > > > Gang
>>>> >>>>> > > >
>>>> >>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <
>>>> >>>>> m...@apache.org>
>>>> >>>>> > > wrote:
>>>> >>>>> > > >
>>>> >>>>> > > >> Hey Gang,
>>>> >>>>> > > >>
>>>> >>>>> > > >> What I'm looking for here is a complete picture of why the
>>>> >>>>> change is
>>>> >>>>> > > >> necessary and what the next steps are. Ultimately,
>>>> refactoring
>>>> >>>>> any
>>>> >>>>> > code
>>>> >>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator
>>>> >>>>> code such
>>>> >>>>> > > that
>>>> >>>>> > > >> we can add a SinkCoordinator, similar to the
>>>> SourceCoordinator.
>>>> >>>>> The
>>>> >>>>> > FLIP
>>>> >>>>> > > >> should address the next steps, i.e. how you plan to add the
>>>> >>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It
>>>> doesn't
>>>> >>>>> have to
>>>> >>>>> > be
>>>> >>>>> > > in
>>>> >>>>> > > >> great detail but without this information, I don't think
>>>> the
>>>> >>>>> FLIP is
>>>> >>>>> > > >> complete.
>>>> >>>>> > > >>
>>>> >>>>> > > >> This feature should be generic enough to be usable by other
>>>> >>>>> sinks than
>>>> >>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own
>>>> >>>>> > > implementation
>>>> >>>>> > > >> which may be outlined in a separate FLIP.
>>>> >>>>> > > >>
>>>> >>>>> > > >> Unless there is a good reason, normal operators should not
>>>> >>>>> support the
>>>> >>>>> > > >> coordinator functionality. At least I'm not convinced it
>>>> would
>>>> >>>>> play
>>>> >>>>> > well
>>>> >>>>> > > >> with Flink's execution model. But I see how it is required
>>>> for
>>>> >>>>> sources
>>>> >>>>> > > and
>>>> >>>>> > > >> sinks.
>>>> >>>>> > > >>
>>>> >>>>> > > >> -Max
>>>> >>>>> > > >>
>>>> >>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <
>>>> yegang...@gmail.com>
>>>> >>>>> wrote:
>>>> >>>>> > > >>
>>>> >>>>> > > >>> Hi Max,
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> Thanks for reviewing.
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting
>>>> RPC
>>>> >>>>> calls
>>>> >>>>> > > >>> between the task and the job manager for communications
>>>> and
>>>> >>>>> won't
>>>> >>>>> > touch
>>>> >>>>> > > >>> watermark checkpoint.
>>>> >>>>> > > >>> If the coordinator doesn't need RPC calls to talk with
>>>> >>>>> subtasks, then
>>>> >>>>> > > it
>>>> >>>>> > > >>> can define context without extending from the
>>>> >>>>> > CoordinatorContextBase(or
>>>> >>>>> > > >>> find another class name to limit the scope).
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we
>>>> will
>>>> >>>>> only do
>>>> >>>>> > > >>> context extraction. The shuffling coordinator and operator
>>>> >>>>> > > >>> <
>>>> >>>>> > >
>>>> >>>>> >
>>>> >>>>>
>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>>> >>>>> > > >
>>>> >>>>> > > >>> which will use the context will come with a separate
>>>> proposal,
>>>> >>>>> thus
>>>> >>>>> > we
>>>> >>>>> > > try
>>>> >>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a
>>>> >>>>> little bit
>>>> >>>>> > > more
>>>> >>>>> > > >>> about how to use the coordinator context in Flip 264 if
>>>> you
>>>> >>>>> think
>>>> >>>>> > that
>>>> >>>>> > > will
>>>> >>>>> > > >>> be helpful.
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> Thanks!
>>>> >>>>> > > >>> Gang
>>>> >>>>> > > >>>
>>>> >>>>> > > >>>
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
>>>> >>>>> m...@apache.org>
>>>> >>>>> > > >>> wrote:
>>>> >>>>> > > >>>
>>>> >>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat
>>>> of a
>>>> >>>>> bigger
>>>> >>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27,
>>>> was
>>>> >>>>> > > specifically
>>>> >>>>> > > >>>> added to synchronize the global watermark and to assign
>>>> splits
>>>> >>>>> > > dynamically.
>>>> >>>>> > > >>>> However, it practically allows arbitrary RPC calls
>>>> between
>>>> >>>>> the task
>>>> >>>>> > > and the
>>>> >>>>> > > >>>> job manager. I understand that there is concern that
>>>> such a
>>>> >>>>> powerful
>>>> >>>>> > > >>>> mechanism should not be available to all operators.
>>>> >>>>> Nevertheless, I
>>>> >>>>> > > see the
>>>> >>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd
>>>> suggest
>>>> >>>>> limiting
>>>> >>>>> > > this
>>>> >>>>> > > >>>> feature to sinks (and sources) only.
>>>> >>>>> > > >>>>
>>>> >>>>> > > >>>> I'm wondering whether extracting the
>>>> SourceCoordinatorContext
>>>> >>>>> is
>>>> >>>>> > > >>>> enough to achieve what you want. There will be
>>>> additional work
>>>> >>>>> > > necessary,
>>>> >>>>> > > >>>> e.g. create a SinkCoordinator similarly to
>>>> SourceCoordinator
>>>> >>>>> which
>>>> >>>>> > > handles
>>>> >>>>> > > >>>> the RPC calls and the checkpointing. I think it would be
>>>> good
>>>> >>>>> to
>>>> >>>>> > > outline
>>>> >>>>> > > >>>> this in the FLIP.
>>>> >>>>> > > >>>>
>>>> >>>>> > > >>>> -Max
>>>> >>>>> > > >>>>
>>>> >>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <
>>>> >>>>> stevenz...@gmail.com>
>>>> >>>>> > > wrote:
>>>> >>>>> > > >>>>
>>>> >>>>> > > >>>>> sorry. sent the incomplete reply by mistake.
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>> If there are any concrete concerns, we can discuss. In
>>>> the
>>>> >>>>> > > FLINK-27405
>>>> >>>>> > > >>>>> [1],
>>>> >>>>> > > >>>>> Avid pointed out some implications regarding
>>>> checkpointing.
>>>> >>>>> In this
>>>> >>>>> > > >>>>> small
>>>> >>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing
>>>> logic,
>>>> >>>>> we
>>>> >>>>> > mainly
>>>> >>>>> > > >>>>> need
>>>> >>>>> > > >>>>> the coordinator context functionality to facilitate the
>>>> >>>>> > communication
>>>> >>>>> > > >>>>> between coordinator and subtasks.
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <
>>>> >>>>> stevenz...@gmail.com>
>>>> >>>>> > > >>>>> wrote:
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>> > Hang, appreciate your input. Agree that
>>>> >>>>> `CoordinatorContextBase`
>>>> >>>>> > > is a
>>>> >>>>> > > >>>>> > better name considering Flink code convention.
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> > If there are any concrete concerns, we can discuss.
>>>> In the
>>>> >>>>> jira,
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
>>>> >>>>> > ruanhang1...@gmail.com
>>>> >>>>> > > >
>>>> >>>>> > > >>>>> wrote:
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> >> Hi,
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >> IMP, I agree to extract a base class for
>>>> >>>>> > SourceCoordinatorContext.
>>>> >>>>> > > >>>>> >> But I prefer to use the name
>>>> >>>>> `OperatorCoordinatorContextBase` or
>>>> >>>>> > > >>>>> >> `CoordinatorContextBase` as the format like
>>>> >>>>> `SourceReaderBase`.
>>>> >>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems
>>>> will
>>>> >>>>> occur
>>>> >>>>> > > when
>>>> >>>>> > > >>>>> >> connectors start to use it.
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >> Best,
>>>> >>>>> > > >>>>> >> Hang
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五
>>>> 22:31写道:
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >> > Piotr,
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > The proposal is to extract the listed methods from
>>>> >>>>> @Iinternal
>>>> >>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
>>>> >>>>> > > >>>>> BaseCoordinatorContext.
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > The motivation is that other operators can
>>>> leverage the
>>>> >>>>> > > >>>>> communication
>>>> >>>>> > > >>>>> >> > mechanism btw operator coordinator and operator
>>>> >>>>> subtasks. For
>>>> >>>>> > > >>>>> example,
>>>> >>>>> > > >>>>> >> in
>>>> >>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink
>>>> >>>>> Iceberg sink)
>>>> >>>>> > > can
>>>> >>>>> > > >>>>> >> leverage
>>>> >>>>> > > >>>>> >> > it for computing traffic distribution statistics.
>>>> >>>>> > > >>>>> >> > * subtasks calculate local statistics and
>>>> periodically
>>>> >>>>> send
>>>> >>>>> > them
>>>> >>>>> > > >>>>> to the
>>>> >>>>> > > >>>>> >> > coordinator for global aggregation.
>>>> >>>>> > > >>>>> >> > * The coordinator can broadcast the globally
>>>> aggregated
>>>> >>>>> > > >>>>> statistics to
>>>> >>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling
>>>> >>>>> decision
>>>> >>>>> > > >>>>> (selecting
>>>> >>>>> > > >>>>> >> > downstream channels).
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > Thanks,
>>>> >>>>> > > >>>>> >> > Steven
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
>>>> >>>>> > > >>>>> pnowoj...@apache.org>
>>>> >>>>> > > >>>>> >> > wrote:
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > > Hi,
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > Could you clarify what's the proposal that you
>>>> have
>>>> >>>>> in mind?
>>>> >>>>> > > >>>>> From the
>>>> >>>>> > > >>>>> >> > > context I would understand that the newly
>>>> extracted
>>>> >>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked
>>>> as
>>>> >>>>> > > >>>>> `@PublicEvolving`
>>>> >>>>> > > >>>>> >> or
>>>> >>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it
>>>> and
>>>> >>>>> keeping
>>>> >>>>> > > >>>>> `@Internal`
>>>> >>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class
>>>> >>>>> could have
>>>> >>>>> > > >>>>> been
>>>> >>>>> > > >>>>> >> removed
>>>> >>>>> > > >>>>> >> > > at any point of time in the future. Having said
>>>> that,
>>>> >>>>> it
>>>> >>>>> > > sounds
>>>> >>>>> > > >>>>> to me
>>>> >>>>> > > >>>>> >> > like
>>>> >>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at
>>>> the
>>>> >>>>> first
>>>> >>>>> > > glance
>>>> >>>>> > > >>>>> and
>>>> >>>>> > > >>>>> >> you
>>>> >>>>> > > >>>>> >> > > actually want to expose the operator coordinator
>>>> >>>>> concept to
>>>> >>>>> > > the
>>>> >>>>> > > >>>>> public
>>>> >>>>> > > >>>>> >> > API?
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > AFAIK there were some discussions about that,
>>>> and it
>>>> >>>>> was a
>>>> >>>>> > bit
>>>> >>>>> > > >>>>> of a
>>>> >>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know
>>>> those
>>>> >>>>> > reasons
>>>> >>>>> > > >>>>> however.
>>>> >>>>> > > >>>>> >> > Only
>>>> >>>>> > > >>>>> >> > > now, I've just heard that there are for example
>>>> some
>>>> >>>>> > problems
>>>> >>>>> > > >>>>> with
>>>> >>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator
>>>> >>>>> > > coordinators.
>>>> >>>>> > > >>>>> Maybe
>>>> >>>>> > > >>>>> >> > > someone else could shed some light on this?
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > Conceptually I would be actually in favour of
>>>> exposing
>>>> >>>>> > > operator
>>>> >>>>> > > >>>>> >> > > coordinators if there is a good reason behind
>>>> that,
>>>> >>>>> but it
>>>> >>>>> > is
>>>> >>>>> > > a
>>>> >>>>> > > >>>>> more
>>>> >>>>> > > >>>>> >> > > difficult topic and might be a larger effort
>>>> than it
>>>> >>>>> seems
>>>> >>>>> > at
>>>> >>>>> > > >>>>> the
>>>> >>>>> > > >>>>> >> first
>>>> >>>>> > > >>>>> >> > > glance.
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > Best,
>>>> >>>>> > > >>>>> >> > > Piotrek
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <
>>>> >>>>> stevenz...@gmail.com>
>>>> >>>>> > > >>>>> napisał(a):
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked
>>>> >>>>> google doc
>>>> >>>>> > is
>>>> >>>>> > > >>>>> not for
>>>> >>>>> > > >>>>> >> > this
>>>> >>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki
>>>> page.
>>>> >>>>> The
>>>> >>>>> > linked
>>>> >>>>> > > >>>>> google
>>>> >>>>> > > >>>>> >> doc
>>>> >>>>> > > >>>>> >> > > is
>>>> >>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink
>>>> >>>>> Iceberg
>>>> >>>>> > sink,
>>>> >>>>> > > >>>>> which
>>>> >>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the
>>>> shuffle
>>>> >>>>> > coordinator
>>>> >>>>> > > >>>>> can
>>>> >>>>> > > >>>>> >> > leverage
>>>> >>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid
>>>> code
>>>> >>>>> > > >>>>> duplication.
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
>>>> >>>>> > j...@ververica.com>
>>>> >>>>> > > >>>>> wrote:
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall
>>>> >>>>> good! One
>>>> >>>>> > > >>>>> small
>>>> >>>>> > > >>>>> >> thing,
>>>> >>>>> > > >>>>> >> > > you
>>>> >>>>> > > >>>>> >> > > > > might want to write all content on the wiki
>>>> page
>>>> >>>>> instead
>>>> >>>>> > > of
>>>> >>>>> > > >>>>> >> linking
>>>> >>>>> > > >>>>> >> > to
>>>> >>>>> > > >>>>> >> > > a
>>>> >>>>> > > >>>>> >> > > > > google doc. The reason is that some people
>>>> might
>>>> >>>>> not be
>>>> >>>>> > > >>>>> able to
>>>> >>>>> > > >>>>> >> > access
>>>> >>>>> > > >>>>> >> > > > the
>>>> >>>>> > > >>>>> >> > > > > google doc.
>>>> >>>>> > > >>>>> >> > > > >
>>>> >>>>> > > >>>>> >> > > > > Best regards,
>>>> >>>>> > > >>>>> >> > > > > Jing
>>>> >>>>> > > >>>>> >> > > > >
>>>> >>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
>>>> >>>>> > > yegang...@gmail.com
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> >> wrote:
>>>> >>>>> > > >>>>> >> > > > >
>>>> >>>>> > > >>>>> >> > > > >> Hi,
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > > >> We submit the Flip proposal
>>>> >>>>> > > >>>>> >> > > > >> <
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>>
>>>> >>>>> > >
>>>> >>>>> >
>>>> >>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>>>> >>>>> > > >>>>> >> > > > >> >
>>>> >>>>> > > >>>>> >> > > > >> at Confluent to extract
>>>> BaseCoordinatorContext
>>>> >>>>> from
>>>> >>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for
>>>> other
>>>> >>>>> > > >>>>> coordinators E.g.
>>>> >>>>> > > >>>>> >> in
>>>> >>>>> > > >>>>> >> > > the
>>>> >>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
>>>> >>>>> > > >>>>> >> > > > >> <
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>>
>>>> >>>>> > >
>>>> >>>>> >
>>>> >>>>>
>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>>> >>>>> > > >>>>> >> > > > >> >
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > > >> Could you help to take a look?
>>>> >>>>> > > >>>>> >> > > > >> Thanks
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > > >> Gang
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > > >
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>
>>>> >>>>> > >
>>>> >>>>> >
>>>> >>>>>
>>>> >>>>
>>>>
>>>

Reply via email to