Let me start an initial discussion thread at dev@flink. Like to gauge the interests from the community (including Hudi and Delta Lake) first before spending time on writing up a big FLIP.
On Fri, Jan 27, 2023 at 10:45 PM Jark Wu <imj...@gmail.com> wrote: > Thank Steven for the explanation. > > It sounds good to me to implement the shuffle operator in the Iceberg > project first. > We can contribute it to Flink DataStream in the future if other > projects/connectors also need it. > > Best, > Jark > > > On Wed, 18 Jan 2023 at 02:11, Steven Wu <stevenz...@gmail.com> wrote: > >> Jark, >> >> We were planning to discard the proposal due to some valid concerns >> raised in the thread. Also, this proposal itself didn't really save too >> much code duplication (maybe 100 lines or so). >> >> I also thought that the shuffle operator for DataStream can be useful for >> other connectors too. The shuffling part (based on traffic statistics) can >> be generic for other connectors. There will be some small integration part >> unique to Iceberg, which can stay in Iceberg. If we go with this new >> direction, we would need a new FLIP. >> >> Thanks, >> Steven >> >> >> >> On Mon, Jan 16, 2023 at 12:30 AM Jark Wu <imj...@gmail.com> wrote: >> >>> What's the status and conclusion of this discussion? >>> >>> I have seen the value of exposing OperatorCoordinator because of the >>> powerful RPC calls, >>> some projects are already using it, such as Hudi[1]. But I agree this is >>> a large topic and >>> requires another FLIP. >>> >>> I am also concerned about extracting a Public base class without >>> implementations, and >>> clear usage is easy to break in the future. However, I think the >>> shuffling operator can be a >>> generic component used by other connectors and DataStream jobs. >>> >>> Have you considered contributing the ShuffleOperator to the Flink main >>> repository as a >>> part of DataStream API (e.g., DataStream#dynamicShuffle)? It's easy to >>> extract the common >>> part between SourceCoordinatorContext and ShuffleCoordinatorContext in a >>> single repository >>> as an internal implementation. >>> >>> >>> Best, >>> Jark >>> >>> [1]: >>> https://github.com/apache/hudi/blob/a80bb4f717ad8a89770176a1238c4b08874044e8/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java >>> >>> On Thu, 3 Nov 2022 at 22:36, Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>>> Ohhh, I was confused. I thought that the proposal is to make >>>> `CoordinatorContextBase` part of the public API. >>>> >>>> However, I'm also against extracting `CoordinatorContextBase` as an >>>> `@Internal` class as well. >>>> >>>> 1. Connectors shouldn't reuse internal classes. Using `@Internal` >>>> CoordinatedOperatorFactory would be already quite bad, but at least >>>> this is >>>> a relatively stable internal API. Using `@Internal` >>>> `@CoordinatorContextBase`, and refactoring out this base class just for >>>> the >>>> sake of re-using it in a connector is IMO even worse. >>>> 2. Double so if they are in a separate repository (as the iceberg >>>> connector >>>> will be/is, right?). There would be no way to prevent breaking changes >>>> between repositories. >>>> >>>> If that's only intended as the stop-gap solution until we properly >>>> expose >>>> coordinators, the lesser evil would be IMO to copy/paste/modify >>>> SourceCoordinatorContext to the flink-connector-iceberg repository. >>>> >>>> Best, >>>> Piotrek >>>> >>>> czw., 3 lis 2022 o 12:51 Maximilian Michels <m...@apache.org> >>>> napisał(a): >>>> >>>> > +1 If we wanted to expose the OperatorCoordinator API, we should >>>> provide >>>> > an adequate interface. The FLIP partially addresses this by trying to >>>> > factor out RPC code which other coordinators might make use of, but >>>> there >>>> > is additional design necessary to realize a public operator API. >>>> > >>>> > Just to be clear, I'm not opposed to any of the changes in the FLIP. I >>>> > think they make sense in the context of an Iceberg ShuffleCoordinator >>>> in >>>> > Flink. If we were to add such a new coordinator, feel free to make the >>>> > proposed code refactoring as part of a pull request. A FLIP isn't >>>> strictly >>>> > necessary here because this is a purely internal change which does not >>>> > alter public APIs, nor does it alter the internal architecture, apart >>>> from >>>> > reusing a bit of existing code. I'm sorry if we consumed some of your >>>> time >>>> > revising the document but I think we had a healthy discussion here. >>>> And >>>> > we're definitely looking forward to seeing some of these code changes! >>>> > >>>> > -Max >>>> > >>>> > On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pnowoj...@apache.org> >>>> > wrote: >>>> > >>>> >> Hi, >>>> >> >>>> >> Sorry for the delay, but I've given more thoughts into this. First I >>>> >> share the same thoughts as Maximilian, that this FLIP is incomplete. >>>> As I >>>> >> understand it, you are trying to hack existing code to expose small >>>> bits of >>>> >> internal functionalities as part of the public API without solving >>>> many of >>>> >> the underlying issues. >>>> >> >>>> >> For example, what's the point of exposing `CoordinatorContextBase` >>>> as a >>>> >> public API if users can not use it? After all, the >>>> `OperatorCoordinator` >>>> >> and `CoordinatedOperatorFactory` would remain internal. At the same >>>> time, >>>> >> this FLIP would officially force us to support and maintain this >>>> >> CoordinatorContextBase, while I have strong feelings that we don't >>>> want to >>>> >> do that in the long term. I think we would need to take a big step >>>> back and >>>> >> first discuss how we would like to expose the coordinators and agree >>>> how to >>>> >> deal with the issues. >>>> >> >>>> >> First big issue that I see is that I would feel very worried exposing >>>> >> coordinator API without at least designing/planning how to deal with >>>> >> checkpointing their state. Without that, I'm afraid we might end up >>>> in a >>>> >> situation where we need to break the API in order to properly support >>>> >> stateful coordinators. And at the moment I don't see a good and easy >>>> >> solution to this problem. >>>> >> >>>> >> Second issue is the shape of the exposed public API. Exposing >>>> >> `OperatorCoordinator` or `CoordinatorContextBase` looks to me like >>>> a bad >>>> >> design, that would expose way too many things to the users, making >>>> future >>>> >> development more complicated for us and making implementation of >>>> those >>>> >> interfaces by the user unnecessary difficult. I see this as a >>>> similar issue >>>> >> as the low level `StreamOperator` API vs the higher level >>>> >> `org.apache.flink.api.common.functions.Function` API. (instead of >>>> exposing >>>> >> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up >>>> the >>>> >> `ProcessFunction` to expose all of the remaining functionalities in a >>>> >> user-friendly way). In the context of the coordinators, I would say >>>> that we >>>> >> should expose as the public API not the `OperatorCoordinator`, but >>>> for >>>> >> example some kind of an `EventProcessFunction` that would have a >>>> simple >>>> >> interface like: >>>> >> ``` >>>> >> interface EventProcessFunction { >>>> >> void processEvent(int subtask, OperatorEvent event, EventDispatcher >>>> >> eventDispatcher); >>>> >> } >>>> >> ``` >>>> >> + maybe some features like processing time timers/mailbox style async >>>> >> actions. >>>> >> (or maybe that could have been just a regular `ProcessFunction` but >>>> with >>>> >> `OperatorEvent` with `int subtask` as input/output). >>>> >> >>>> >> Best, >>>> >> Piotrek >>>> >> >>>> >> śr., 2 lis 2022 o 19:40 gang ye <yegang...@gmail.com> napisał(a): >>>> >> >>>> >>> Hi Max and Qingsheng, >>>> >>> >>>> >>> Thanks for the feedback. The initial motivation to propose this is >>>> to >>>> >>> reduce the duplicated code since ShuffleCoordinator would need >>>> similar >>>> >>> communication logic as SourceCoordinator to talk with operators. I >>>> >>> understand your concern that OperatorCoordinator is an internal >>>> class and >>>> >>> except SourceCoordinator for now no other uses this. >>>> >>> How about let's do it like what Qingsheng said? I can go ahead with >>>> the >>>> >>> ShufflingCoordinator implementation without the extraction. Then we >>>> have >>>> >>> intuitive sense of how many codes are copied and can be reused. If >>>> we feel >>>> >>> that there is still a need to extract, we can revisit the >>>> discussion. >>>> >>> >>>> >>> Thanks >>>> >>> Gang >>>> >>> >>>> >>> >>>> >>> >>>> >>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org> >>>> wrote: >>>> >>> >>>> >>>> Thanks Gang and Steven for the FLIP. Actually I share the same >>>> concern >>>> >>>> with Piotr and Maximilian. >>>> >>>> >>>> >>>> OperatorCoordinator is marked as @Internal intentionally >>>> considering >>>> >>>> some existing issues, like consistency between non-source operator >>>> and >>>> >>>> coordinator on checkpoint. I'm wondering if it is useful to expose >>>> a public >>>> >>>> context to developers but have the OperatorCoordinator as an >>>> internal API. >>>> >>>> If we finally close all issues and decide to expose the operator >>>> >>>> coordinator API, it would be a better chance to include the base >>>> context as >>>> >>>> a part of it. >>>> >>>> >>>> >>>> Best, >>>> >>>> Qingsheng >>>> >>>> >>>> >>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <m...@apache.org> >>>> >>>> wrote: >>>> >>>> >>>> >>>>> Thanks Steven! My confusion stemmed from the lack of context in >>>> the >>>> >>>>> FLIP. >>>> >>>>> The first version did not lay out how the refactoring would be >>>> used >>>> >>>>> down >>>> >>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator >>>> API >>>> >>>>> is a >>>> >>>>> non-public API and before reading the code, I wasn't even aware >>>> how >>>> >>>>> exactly >>>> >>>>> it worked and whether it would be available to regular operators >>>> (it >>>> >>>>> was >>>> >>>>> originally intended for sources only). >>>> >>>>> >>>> >>>>> I might seem pedantic here but I believe the purpose of a FLIP >>>> should >>>> >>>>> be to >>>> >>>>> describe the *why* behind the changes, not only the changes >>>> itself. A >>>> >>>>> FLIP >>>> >>>>> is not a formality but is a tool to communicate and discuss >>>> changes. I >>>> >>>>> think we still haven't laid out the exact reasons why we are >>>> factoring >>>> >>>>> out >>>> >>>>> the base. As far as I understand now, we need the base class to >>>> deal >>>> >>>>> with >>>> >>>>> concurrent updates in the custom Coordinator from the runtime >>>> >>>>> (sub)tasks. >>>> >>>>> Effectively, we are enforcing an actor model for the processing >>>> of the >>>> >>>>> incoming messages such that the OperatorCoordinator can cleanly >>>> update >>>> >>>>> its >>>> >>>>> state. However, if there are no actual implementations that make >>>> use >>>> >>>>> of the >>>> >>>>> refactoring in Flink itself, I wonder if it would make sense to >>>> copy >>>> >>>>> this >>>> >>>>> code to the downstream implementation, e.g. the >>>> ShuffleCoordinator. As >>>> >>>>> soon >>>> >>>>> as it is part of Flink, we could of course try to consolidate this >>>> >>>>> code. >>>> >>>>> >>>> >>>>> Considering the *how* of this, there appear to be both methods >>>> from >>>> >>>>> SourceCoordinator (e.g. runInEventLoop) as well as >>>> >>>>> SourceCoordinatorContext >>>> >>>>> listed in the FLIP, as well as methods which do not appear >>>> anywhere in >>>> >>>>> Flink code, e.g. subTaskReady / subTaskNotReady / >>>> sendEventToOperator. >>>> >>>>> It >>>> >>>>> appears that some of this has been extracted from a downstream >>>> >>>>> implementation. It would be great to adjust this, such that it >>>> >>>>> reflects the >>>> >>>>> status quo in Flink. >>>> >>>>> >>>> >>>>> -Max >>>> >>>>> >>>> >>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <stevenz...@gmail.com> >>>> >>>>> wrote: >>>> >>>>> >>>> >>>>> > Max, >>>> >>>>> > >>>> >>>>> > Thanks a lot for the comments. We should clarify that the >>>> shuffle >>>> >>>>> > operator/coordinator is not really part of the Flink sink >>>> >>>>> > function/operator. shuffle operator is a custom operator that >>>> can be >>>> >>>>> > inserted right before the Iceberg writer operator. Shuffle >>>> operator >>>> >>>>> > calculates the traffic statistics and performs a custom >>>> >>>>> partition/shuffle >>>> >>>>> > (DataStream#partitionCustom) to cluster the data right before >>>> they >>>> >>>>> get to >>>> >>>>> > the Iceberg writer operator. >>>> >>>>> > >>>> >>>>> > We are not proposing to introduce a sink coordinator for the >>>> sink >>>> >>>>> > interface. Shuffle operator needs the CoordinatorContextBase to >>>> >>>>> > facilitate the communication btw shuffle subtasks and >>>> coordinator for >>>> >>>>> > traffic statistics aggregation. The communication part is >>>> already >>>> >>>>> > implemented by SourceCoordinatorContext. >>>> >>>>> > >>>> >>>>> > Here are some details about the communication needs. >>>> >>>>> > - subtasks periodically calculate local statistics and send to >>>> the >>>> >>>>> > coordinator for global aggregation >>>> >>>>> > - the coordinator sends the globally aggregated statistics to >>>> the >>>> >>>>> subtasks >>>> >>>>> > - subtasks use the globally aggregated statistics to guide the >>>> >>>>> > partition/shuffle decision >>>> >>>>> > >>>> >>>>> > Regards, >>>> >>>>> > Steven >>>> >>>>> > >>>> >>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels < >>>> m...@apache.org> >>>> >>>>> wrote: >>>> >>>>> > >>>> >>>>> > > Hi Gang, >>>> >>>>> > > >>>> >>>>> > > Looks much better! I've actually gone through the >>>> >>>>> OperatorCoordinator >>>> >>>>> > code. >>>> >>>>> > > It turns out, any operator already has an OperatorCoordinator >>>> >>>>> assigned. >>>> >>>>> > > Also, any operator can add custom coordinator code. So it >>>> looks >>>> >>>>> like you >>>> >>>>> > > won't have to implement any additional runtime logic to add a >>>> >>>>> > > ShuffleCoordinator. However, I'm wondering, why do you >>>> >>>>> specifically need >>>> >>>>> > to >>>> >>>>> > > refactor the SourceCoordinatorContext? You could simply add >>>> your >>>> >>>>> own >>>> >>>>> > > coordinator code. I'm not sure the sink requirements map to >>>> the >>>> >>>>> source >>>> >>>>> > > interface so closely that you can reuse the same logic. >>>> >>>>> > > >>>> >>>>> > > If you can refactor SourceCoordinatorContext in a way that >>>> makes >>>> >>>>> it fit >>>> >>>>> > > your use case, I have nothing to object here. By the way, >>>> another >>>> >>>>> example >>>> >>>>> > > of an existing OperatorCoordinator is >>>> >>>>> CollectSinkOperatorCoordinator >>>> >>>>> > which >>>> >>>>> > > is quite trivial but it might be worth evaluating whether you >>>> need >>>> >>>>> the >>>> >>>>> > full >>>> >>>>> > > power of SourceCoordinatorContext which is why I wanted to >>>> get more >>>> >>>>> > > context. >>>> >>>>> > > >>>> >>>>> > > -Max >>>> >>>>> > > >>>> >>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <yegang...@gmail.com> >>>> >>>>> wrote: >>>> >>>>> > > >>>> >>>>> > > > Hi Max, >>>> >>>>> > > > I got your concern. Since shuffling support for Flink >>>> Iceberg >>>> >>>>> sink is >>>> >>>>> > not >>>> >>>>> > > > the main body of the proposal, I add another appendix part >>>> just >>>> >>>>> now >>>> >>>>> > with >>>> >>>>> > > > more details about how to use CoordinatorContextBase and >>>> how to >>>> >>>>> define >>>> >>>>> > > > ShufflingCoordinator. >>>> >>>>> > > > >>>> >>>>> > > > Let me know if that cannot solve your concern. >>>> >>>>> > > > >>>> >>>>> > > > Thanks >>>> >>>>> > > > Gang >>>> >>>>> > > > >>>> >>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels < >>>> >>>>> m...@apache.org> >>>> >>>>> > > wrote: >>>> >>>>> > > > >>>> >>>>> > > >> Hey Gang, >>>> >>>>> > > >> >>>> >>>>> > > >> What I'm looking for here is a complete picture of why the >>>> >>>>> change is >>>> >>>>> > > >> necessary and what the next steps are. Ultimately, >>>> refactoring >>>> >>>>> any >>>> >>>>> > code >>>> >>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator >>>> >>>>> code such >>>> >>>>> > > that >>>> >>>>> > > >> we can add a SinkCoordinator, similar to the >>>> SourceCoordinator. >>>> >>>>> The >>>> >>>>> > FLIP >>>> >>>>> > > >> should address the next steps, i.e. how you plan to add the >>>> >>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It >>>> doesn't >>>> >>>>> have to >>>> >>>>> > be >>>> >>>>> > > in >>>> >>>>> > > >> great detail but without this information, I don't think >>>> the >>>> >>>>> FLIP is >>>> >>>>> > > >> complete. >>>> >>>>> > > >> >>>> >>>>> > > >> This feature should be generic enough to be usable by other >>>> >>>>> sinks than >>>> >>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own >>>> >>>>> > > implementation >>>> >>>>> > > >> which may be outlined in a separate FLIP. >>>> >>>>> > > >> >>>> >>>>> > > >> Unless there is a good reason, normal operators should not >>>> >>>>> support the >>>> >>>>> > > >> coordinator functionality. At least I'm not convinced it >>>> would >>>> >>>>> play >>>> >>>>> > well >>>> >>>>> > > >> with Flink's execution model. But I see how it is required >>>> for >>>> >>>>> sources >>>> >>>>> > > and >>>> >>>>> > > >> sinks. >>>> >>>>> > > >> >>>> >>>>> > > >> -Max >>>> >>>>> > > >> >>>> >>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye < >>>> yegang...@gmail.com> >>>> >>>>> wrote: >>>> >>>>> > > >> >>>> >>>>> > > >>> Hi Max, >>>> >>>>> > > >>> >>>> >>>>> > > >>> Thanks for reviewing. >>>> >>>>> > > >>> >>>> >>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting >>>> RPC >>>> >>>>> calls >>>> >>>>> > > >>> between the task and the job manager for communications >>>> and >>>> >>>>> won't >>>> >>>>> > touch >>>> >>>>> > > >>> watermark checkpoint. >>>> >>>>> > > >>> If the coordinator doesn't need RPC calls to talk with >>>> >>>>> subtasks, then >>>> >>>>> > > it >>>> >>>>> > > >>> can define context without extending from the >>>> >>>>> > CoordinatorContextBase(or >>>> >>>>> > > >>> find another class name to limit the scope). >>>> >>>>> > > >>> >>>> >>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we >>>> will >>>> >>>>> only do >>>> >>>>> > > >>> context extraction. The shuffling coordinator and operator >>>> >>>>> > > >>> < >>>> >>>>> > > >>>> >>>>> > >>>> >>>>> >>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo >>>> >>>>> > > > >>>> >>>>> > > >>> which will use the context will come with a separate >>>> proposal, >>>> >>>>> thus >>>> >>>>> > we >>>> >>>>> > > try >>>> >>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a >>>> >>>>> little bit >>>> >>>>> > > more >>>> >>>>> > > >>> about how to use the coordinator context in Flip 264 if >>>> you >>>> >>>>> think >>>> >>>>> > that >>>> >>>>> > > will >>>> >>>>> > > >>> be helpful. >>>> >>>>> > > >>> >>>> >>>>> > > >>> Thanks! >>>> >>>>> > > >>> Gang >>>> >>>>> > > >>> >>>> >>>>> > > >>> >>>> >>>>> > > >>> >>>> >>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels < >>>> >>>>> m...@apache.org> >>>> >>>>> > > >>> wrote: >>>> >>>>> > > >>> >>>> >>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat >>>> of a >>>> >>>>> bigger >>>> >>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27, >>>> was >>>> >>>>> > > specifically >>>> >>>>> > > >>>> added to synchronize the global watermark and to assign >>>> splits >>>> >>>>> > > dynamically. >>>> >>>>> > > >>>> However, it practically allows arbitrary RPC calls >>>> between >>>> >>>>> the task >>>> >>>>> > > and the >>>> >>>>> > > >>>> job manager. I understand that there is concern that >>>> such a >>>> >>>>> powerful >>>> >>>>> > > >>>> mechanism should not be available to all operators. >>>> >>>>> Nevertheless, I >>>> >>>>> > > see the >>>> >>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd >>>> suggest >>>> >>>>> limiting >>>> >>>>> > > this >>>> >>>>> > > >>>> feature to sinks (and sources) only. >>>> >>>>> > > >>>> >>>> >>>>> > > >>>> I'm wondering whether extracting the >>>> SourceCoordinatorContext >>>> >>>>> is >>>> >>>>> > > >>>> enough to achieve what you want. There will be >>>> additional work >>>> >>>>> > > necessary, >>>> >>>>> > > >>>> e.g. create a SinkCoordinator similarly to >>>> SourceCoordinator >>>> >>>>> which >>>> >>>>> > > handles >>>> >>>>> > > >>>> the RPC calls and the checkpointing. I think it would be >>>> good >>>> >>>>> to >>>> >>>>> > > outline >>>> >>>>> > > >>>> this in the FLIP. >>>> >>>>> > > >>>> >>>> >>>>> > > >>>> -Max >>>> >>>>> > > >>>> >>>> >>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu < >>>> >>>>> stevenz...@gmail.com> >>>> >>>>> > > wrote: >>>> >>>>> > > >>>> >>>> >>>>> > > >>>>> sorry. sent the incomplete reply by mistake. >>>> >>>>> > > >>>>> >>>> >>>>> > > >>>>> If there are any concrete concerns, we can discuss. In >>>> the >>>> >>>>> > > FLINK-27405 >>>> >>>>> > > >>>>> [1], >>>> >>>>> > > >>>>> Avid pointed out some implications regarding >>>> checkpointing. >>>> >>>>> In this >>>> >>>>> > > >>>>> small >>>> >>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing >>>> logic, >>>> >>>>> we >>>> >>>>> > mainly >>>> >>>>> > > >>>>> need >>>> >>>>> > > >>>>> the coordinator context functionality to facilitate the >>>> >>>>> > communication >>>> >>>>> > > >>>>> between coordinator and subtasks. >>>> >>>>> > > >>>>> >>>> >>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405 >>>> >>>>> > > >>>>> >>>> >>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu < >>>> >>>>> stevenz...@gmail.com> >>>> >>>>> > > >>>>> wrote: >>>> >>>>> > > >>>>> >>>> >>>>> > > >>>>> > Hang, appreciate your input. Agree that >>>> >>>>> `CoordinatorContextBase` >>>> >>>>> > > is a >>>> >>>>> > > >>>>> > better name considering Flink code convention. >>>> >>>>> > > >>>>> > >>>> >>>>> > > >>>>> > If there are any concrete concerns, we can discuss. >>>> In the >>>> >>>>> jira, >>>> >>>>> > > >>>>> > >>>> >>>>> > > >>>>> > >>>> >>>>> > > >>>>> > >>>> >>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan < >>>> >>>>> > ruanhang1...@gmail.com >>>> >>>>> > > > >>>> >>>>> > > >>>>> wrote: >>>> >>>>> > > >>>>> > >>>> >>>>> > > >>>>> >> Hi, >>>> >>>>> > > >>>>> >> >>>> >>>>> > > >>>>> >> IMP, I agree to extract a base class for >>>> >>>>> > SourceCoordinatorContext. >>>> >>>>> > > >>>>> >> But I prefer to use the name >>>> >>>>> `OperatorCoordinatorContextBase` or >>>> >>>>> > > >>>>> >> `CoordinatorContextBase` as the format like >>>> >>>>> `SourceReaderBase`. >>>> >>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems >>>> will >>>> >>>>> occur >>>> >>>>> > > when >>>> >>>>> > > >>>>> >> connectors start to use it. >>>> >>>>> > > >>>>> >> >>>> >>>>> > > >>>>> >> Best, >>>> >>>>> > > >>>>> >> Hang >>>> >>>>> > > >>>>> >> >>>> >>>>> > > >>>>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五 >>>> 22:31写道: >>>> >>>>> > > >>>>> >> >>>> >>>>> > > >>>>> >> > Piotr, >>>> >>>>> > > >>>>> >> > >>>> >>>>> > > >>>>> >> > The proposal is to extract the listed methods from >>>> >>>>> @Iinternal >>>> >>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving >>>> >>>>> > > >>>>> BaseCoordinatorContext. >>>> >>>>> > > >>>>> >> > >>>> >>>>> > > >>>>> >> > The motivation is that other operators can >>>> leverage the >>>> >>>>> > > >>>>> communication >>>> >>>>> > > >>>>> >> > mechanism btw operator coordinator and operator >>>> >>>>> subtasks. For >>>> >>>>> > > >>>>> example, >>>> >>>>> > > >>>>> >> in >>>> >>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink >>>> >>>>> Iceberg sink) >>>> >>>>> > > can >>>> >>>>> > > >>>>> >> leverage >>>> >>>>> > > >>>>> >> > it for computing traffic distribution statistics. >>>> >>>>> > > >>>>> >> > * subtasks calculate local statistics and >>>> periodically >>>> >>>>> send >>>> >>>>> > them >>>> >>>>> > > >>>>> to the >>>> >>>>> > > >>>>> >> > coordinator for global aggregation. >>>> >>>>> > > >>>>> >> > * The coordinator can broadcast the globally >>>> aggregated >>>> >>>>> > > >>>>> statistics to >>>> >>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling >>>> >>>>> decision >>>> >>>>> > > >>>>> (selecting >>>> >>>>> > > >>>>> >> > downstream channels). >>>> >>>>> > > >>>>> >> > >>>> >>>>> > > >>>>> >> > Thanks, >>>> >>>>> > > >>>>> >> > Steven >>>> >>>>> > > >>>>> >> > >>>> >>>>> > > >>>>> >> > >>>> >>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski < >>>> >>>>> > > >>>>> pnowoj...@apache.org> >>>> >>>>> > > >>>>> >> > wrote: >>>> >>>>> > > >>>>> >> > >>>> >>>>> > > >>>>> >> > > Hi, >>>> >>>>> > > >>>>> >> > > >>>> >>>>> > > >>>>> >> > > Could you clarify what's the proposal that you >>>> have >>>> >>>>> in mind? >>>> >>>>> > > >>>>> From the >>>> >>>>> > > >>>>> >> > > context I would understand that the newly >>>> extracted >>>> >>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked >>>> as >>>> >>>>> > > >>>>> `@PublicEvolving` >>>> >>>>> > > >>>>> >> or >>>> >>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it >>>> and >>>> >>>>> keeping >>>> >>>>> > > >>>>> `@Internal` >>>> >>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class >>>> >>>>> could have >>>> >>>>> > > >>>>> been >>>> >>>>> > > >>>>> >> removed >>>> >>>>> > > >>>>> >> > > at any point of time in the future. Having said >>>> that, >>>> >>>>> it >>>> >>>>> > > sounds >>>> >>>>> > > >>>>> to me >>>> >>>>> > > >>>>> >> > like >>>> >>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at >>>> the >>>> >>>>> first >>>> >>>>> > > glance >>>> >>>>> > > >>>>> and >>>> >>>>> > > >>>>> >> you >>>> >>>>> > > >>>>> >> > > actually want to expose the operator coordinator >>>> >>>>> concept to >>>> >>>>> > > the >>>> >>>>> > > >>>>> public >>>> >>>>> > > >>>>> >> > API? >>>> >>>>> > > >>>>> >> > > >>>> >>>>> > > >>>>> >> > > AFAIK there were some discussions about that, >>>> and it >>>> >>>>> was a >>>> >>>>> > bit >>>> >>>>> > > >>>>> of a >>>> >>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know >>>> those >>>> >>>>> > reasons >>>> >>>>> > > >>>>> however. >>>> >>>>> > > >>>>> >> > Only >>>> >>>>> > > >>>>> >> > > now, I've just heard that there are for example >>>> some >>>> >>>>> > problems >>>> >>>>> > > >>>>> with >>>> >>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator >>>> >>>>> > > coordinators. >>>> >>>>> > > >>>>> Maybe >>>> >>>>> > > >>>>> >> > > someone else could shed some light on this? >>>> >>>>> > > >>>>> >> > > >>>> >>>>> > > >>>>> >> > > Conceptually I would be actually in favour of >>>> exposing >>>> >>>>> > > operator >>>> >>>>> > > >>>>> >> > > coordinators if there is a good reason behind >>>> that, >>>> >>>>> but it >>>> >>>>> > is >>>> >>>>> > > a >>>> >>>>> > > >>>>> more >>>> >>>>> > > >>>>> >> > > difficult topic and might be a larger effort >>>> than it >>>> >>>>> seems >>>> >>>>> > at >>>> >>>>> > > >>>>> the >>>> >>>>> > > >>>>> >> first >>>> >>>>> > > >>>>> >> > > glance. >>>> >>>>> > > >>>>> >> > > >>>> >>>>> > > >>>>> >> > > Best, >>>> >>>>> > > >>>>> >> > > Piotrek >>>> >>>>> > > >>>>> >> > > >>>> >>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu < >>>> >>>>> stevenz...@gmail.com> >>>> >>>>> > > >>>>> napisał(a): >>>> >>>>> > > >>>>> >> > > >>>> >>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked >>>> >>>>> google doc >>>> >>>>> > is >>>> >>>>> > > >>>>> not for >>>> >>>>> > > >>>>> >> > this >>>> >>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki >>>> page. >>>> >>>>> The >>>> >>>>> > linked >>>> >>>>> > > >>>>> google >>>> >>>>> > > >>>>> >> doc >>>> >>>>> > > >>>>> >> > > is >>>> >>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink >>>> >>>>> Iceberg >>>> >>>>> > sink, >>>> >>>>> > > >>>>> which >>>> >>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the >>>> shuffle >>>> >>>>> > coordinator >>>> >>>>> > > >>>>> can >>>> >>>>> > > >>>>> >> > leverage >>>> >>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid >>>> code >>>> >>>>> > > >>>>> duplication. >>>> >>>>> > > >>>>> >> > > > >>>> >>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge < >>>> >>>>> > j...@ververica.com> >>>> >>>>> > > >>>>> wrote: >>>> >>>>> > > >>>>> >> > > > >>>> >>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall >>>> >>>>> good! One >>>> >>>>> > > >>>>> small >>>> >>>>> > > >>>>> >> thing, >>>> >>>>> > > >>>>> >> > > you >>>> >>>>> > > >>>>> >> > > > > might want to write all content on the wiki >>>> page >>>> >>>>> instead >>>> >>>>> > > of >>>> >>>>> > > >>>>> >> linking >>>> >>>>> > > >>>>> >> > to >>>> >>>>> > > >>>>> >> > > a >>>> >>>>> > > >>>>> >> > > > > google doc. The reason is that some people >>>> might >>>> >>>>> not be >>>> >>>>> > > >>>>> able to >>>> >>>>> > > >>>>> >> > access >>>> >>>>> > > >>>>> >> > > > the >>>> >>>>> > > >>>>> >> > > > > google doc. >>>> >>>>> > > >>>>> >> > > > > >>>> >>>>> > > >>>>> >> > > > > Best regards, >>>> >>>>> > > >>>>> >> > > > > Jing >>>> >>>>> > > >>>>> >> > > > > >>>> >>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye < >>>> >>>>> > > yegang...@gmail.com >>>> >>>>> > > >>>>> > >>>> >>>>> > > >>>>> >> wrote: >>>> >>>>> > > >>>>> >> > > > > >>>> >>>>> > > >>>>> >> > > > >> Hi, >>>> >>>>> > > >>>>> >> > > > >> >>>> >>>>> > > >>>>> >> > > > >> We submit the Flip proposal >>>> >>>>> > > >>>>> >> > > > >> < >>>> >>>>> > > >>>>> >> > > > >> >>>> >>>>> > > >>>>> >> > > > >>>> >>>>> > > >>>>> >> > > >>>> >>>>> > > >>>>> >> > >>>> >>>>> > > >>>>> >> >>>> >>>>> > > >>>>> >>>> >>>>> > > >>>> >>>>> > >>>> >>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext >>>> >>>>> > > >>>>> >> > > > >> > >>>> >>>>> > > >>>>> >> > > > >> at Confluent to extract >>>> BaseCoordinatorContext >>>> >>>>> from >>>> >>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for >>>> other >>>> >>>>> > > >>>>> coordinators E.g. >>>> >>>>> > > >>>>> >> in >>>> >>>>> > > >>>>> >> > > the >>>> >>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink >>>> >>>>> > > >>>>> >> > > > >> < >>>> >>>>> > > >>>>> >> > > > >> >>>> >>>>> > > >>>>> >> > > > >>>> >>>>> > > >>>>> >> > > >>>> >>>>> > > >>>>> >> > >>>> >>>>> > > >>>>> >> >>>> >>>>> > > >>>>> >>>> >>>>> > > >>>> >>>>> > >>>> >>>>> >>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo >>>> >>>>> > > >>>>> >> > > > >> > >>>> >>>>> > > >>>>> >> > > > >> >>>> >>>>> > > >>>>> >> > > > >> Could you help to take a look? >>>> >>>>> > > >>>>> >> > > > >> Thanks >>>> >>>>> > > >>>>> >> > > > >> >>>> >>>>> > > >>>>> >> > > > >> Gang >>>> >>>>> > > >>>>> >> > > > >> >>>> >>>>> > > >>>>> >> > > > > >>>> >>>>> > > >>>>> >> > > > >>>> >>>>> > > >>>>> >> > > >>>> >>>>> > > >>>>> >> > >>>> >>>>> > > >>>>> >> >>>> >>>>> > > >>>>> > >>>> >>>>> > > >>>>> >>>> >>>>> > > >>>> >>>> >>>>> > > >>>> >>>>> > >>>> >>>>> >>>> >>>> >>>> >>>