Thanks Steven! My confusion stemmed from the lack of context in the FLIP.
The first version did not lay out how the refactoring would be used down
the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API is a
non-public API and before reading the code, I wasn't even aware how exactly
it worked and whether it would be available to regular operators (it was
originally intended for sources only).

I might seem pedantic here but I believe the purpose of a FLIP should be to
describe the *why* behind the changes, not only the changes itself. A FLIP
is not a formality but is a tool to communicate and discuss changes. I
think we still haven't laid out the exact reasons why we are factoring out
the base. As far as I understand now, we need the base class to deal with
concurrent updates in the custom Coordinator from the runtime (sub)tasks.
Effectively, we are enforcing an actor model for the processing of the
incoming messages such that the OperatorCoordinator can cleanly update its
state. However, if there are no actual implementations that make use of the
refactoring in Flink itself, I wonder if it would make sense to copy this
code to the downstream implementation, e.g. the ShuffleCoordinator. As soon
as it is part of Flink, we could of course try to consolidate this code.

Considering the *how* of this, there appear to be both methods from
SourceCoordinator (e.g. runInEventLoop) as well as SourceCoordinatorContext
listed in the FLIP, as well as methods which do not appear anywhere in
Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator. It
appears that some of this has been extracted from a downstream
implementation. It would be great to adjust this, such that it reflects the
status quo in Flink.

-Max

On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <stevenz...@gmail.com> wrote:

> Max,
>
> Thanks a lot for the comments. We should clarify that the shuffle
> operator/coordinator is not really part of the Flink sink
> function/operator. shuffle operator is a custom operator that can be
> inserted right before the Iceberg writer operator. Shuffle operator
> calculates the traffic statistics and performs a custom partition/shuffle
> (DataStream#partitionCustom) to cluster the data right before they get to
> the Iceberg writer operator.
>
> We are not proposing to introduce a sink coordinator for the sink
> interface. Shuffle operator needs the CoordinatorContextBase to
> facilitate the communication btw shuffle subtasks and coordinator for
> traffic statistics aggregation. The communication part is already
> implemented by SourceCoordinatorContext.
>
> Here are some details about the communication needs.
> - subtasks periodically calculate local statistics and send to the
> coordinator for global aggregation
> - the coordinator sends the globally aggregated statistics to the subtasks
> - subtasks use the globally aggregated statistics to guide the
> partition/shuffle decision
>
> Regards,
> Steven
>
> On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <m...@apache.org> wrote:
>
> > Hi Gang,
> >
> > Looks much better! I've actually gone through the OperatorCoordinator
> code.
> > It turns out, any operator already has an OperatorCoordinator assigned.
> > Also, any operator can add custom coordinator code. So it looks like you
> > won't have to implement any additional runtime logic to add a
> > ShuffleCoordinator. However, I'm wondering, why do you specifically need
> to
> > refactor the SourceCoordinatorContext? You could simply add your own
> > coordinator code. I'm not sure the sink requirements map to the source
> > interface so closely that you can reuse the same logic.
> >
> > If you can refactor SourceCoordinatorContext in a way that makes it fit
> > your use case, I have nothing to object here. By the way, another example
> > of an existing OperatorCoordinator is CollectSinkOperatorCoordinator
> which
> > is quite trivial but it might be worth evaluating whether you need the
> full
> > power of SourceCoordinatorContext which is why I wanted to get more
> > context.
> >
> > -Max
> >
> > On Thu, Oct 27, 2022 at 4:15 PM gang ye <yegang...@gmail.com> wrote:
> >
> > > Hi Max,
> > > I got your concern. Since shuffling support for Flink Iceberg sink is
> not
> > > the main body of the proposal, I add another appendix part just now
> with
> > > more details about how to use CoordinatorContextBase and how to define
> > > ShufflingCoordinator.
> > >
> > > Let me know if that cannot solve your concern.
> > >
> > > Thanks
> > > Gang
> > >
> > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <m...@apache.org>
> > wrote:
> > >
> > >> Hey Gang,
> > >>
> > >> What I'm looking for here is a complete picture of why the change is
> > >> necessary and what the next steps are. Ultimately, refactoring any
> code
> > >> serves a purpose. Here, we want to refactor the Coordinator code such
> > that
> > >> we can add a SinkCoordinator, similar to the SourceCoordinator. The
> FLIP
> > >> should address the next steps, i.e. how you plan to add the
> > >> SinkCoordinator, its interfaces, runtime changes. It doesn't have to
> be
> > in
> > >> great detail but without this information, I don't think the FLIP is
> > >> complete.
> > >>
> > >> This feature should be generic enough to be usable by other sinks than
> > >> the Iceberg sink. Of course Iceberg can still load its own
> > implementation
> > >> which may be outlined in a separate FLIP.
> > >>
> > >> Unless there is a good reason, normal operators should not support the
> > >> coordinator functionality. At least I'm not convinced it would play
> well
> > >> with Flink's execution model. But I see how it is required for sources
> > and
> > >> sinks.
> > >>
> > >> -Max
> > >>
> > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <yegang...@gmail.com> wrote:
> > >>
> > >>> Hi Max,
> > >>>
> > >>> Thanks for reviewing.
> > >>>
> > >>> For this Flip 264, yes, we will only focus on abstracting RPC calls
> > >>> between the task and the job manager for communications and won't
> touch
> > >>> watermark checkpoint.
> > >>> If the coordinator doesn't need RPC calls to talk with subtasks, then
> > it
> > >>> can define context without extending from the
> CoordinatorContextBase(or
> > >>> find another class name to limit the scope).
> > >>>
> > >>> Regarding the code-changing scope, for this Flip 264, we will only do
> > >>> context extraction. The shuffling coordinator and operator
> > >>> <
> >
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> > >
> > >>> which will use the context will come with a separate proposal, thus
> we
> > try
> > >>> to keep it simple in Flip 264 to understand. I can add a little bit
> > more
> > >>> about how to use the coordinator context in Flip 264 if you think
> that
> > will
> > >>> be helpful.
> > >>>
> > >>> Thanks!
> > >>> Gang
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <m...@apache.org>
> > >>> wrote:
> > >>>
> > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a bigger
> > >>>> change. The coordinator for sources, as part of FLIP-27, was
> > specifically
> > >>>> added to synchronize the global watermark and to assign splits
> > dynamically.
> > >>>> However, it practically allows arbitrary RPC calls between the task
> > and the
> > >>>> job manager. I understand that there is concern that such a powerful
> > >>>> mechanism should not be available to all operators. Nevertheless, I
> > see the
> > >>>> practical use in case of sinks like Iceberg. So I'd suggest limiting
> > this
> > >>>> feature to sinks (and sources) only.
> > >>>>
> > >>>> I'm wondering whether extracting the SourceCoordinatorContext is
> > >>>> enough to achieve what you want. There will be additional work
> > necessary,
> > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator which
> > handles
> > >>>> the RPC calls and the checkpointing. I think it would be good to
> > outline
> > >>>> this in the FLIP.
> > >>>>
> > >>>> -Max
> > >>>>
> > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <stevenz...@gmail.com>
> > wrote:
> > >>>>
> > >>>>> sorry. sent the incomplete reply by mistake.
> > >>>>>
> > >>>>> If there are any concrete concerns, we can discuss. In the
> > FLINK-27405
> > >>>>> [1],
> > >>>>> Avid pointed out some implications regarding checkpointing. In this
> > >>>>> small
> > >>>>> FLIP, we are not exposing/changing any checkpointing logic, we
> mainly
> > >>>>> need
> > >>>>> the coordinator context functionality to facilitate the
> communication
> > >>>>> between coordinator and subtasks.
> > >>>>>
> > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
> > >>>>>
> > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <stevenz...@gmail.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>> > Hang, appreciate your input. Agree that `CoordinatorContextBase`
> > is a
> > >>>>> > better name considering Flink code convention.
> > >>>>> >
> > >>>>> > If there are any concrete concerns, we can discuss. In the jira,
> > >>>>> >
> > >>>>> >
> > >>>>> >
> > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
> ruanhang1...@gmail.com
> > >
> > >>>>> wrote:
> > >>>>> >
> > >>>>> >> Hi,
> > >>>>> >>
> > >>>>> >> IMP, I agree to extract a base class for
> SourceCoordinatorContext.
> > >>>>> >> But I prefer to use the name `OperatorCoordinatorContextBase` or
> > >>>>> >> `CoordinatorContextBase` as the format like `SourceReaderBase`.
> > >>>>> >> I also agree to what Piotr said. Maybe more problems will occur
> > when
> > >>>>> >> connectors start to use it.
> > >>>>> >>
> > >>>>> >> Best,
> > >>>>> >> Hang
> > >>>>> >>
> > >>>>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五 22:31写道:
> > >>>>> >>
> > >>>>> >> > Piotr,
> > >>>>> >> >
> > >>>>> >> > The proposal is to extract the listed methods from @Iinternal
> > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
> > >>>>> BaseCoordinatorContext.
> > >>>>> >> >
> > >>>>> >> > The motivation is that other operators can leverage the
> > >>>>> communication
> > >>>>> >> > mechanism btw operator coordinator and operator subtasks. For
> > >>>>> example,
> > >>>>> >> in
> > >>>>> >> > the linked google doc shuffle operator (in Flink Iceberg sink)
> > can
> > >>>>> >> leverage
> > >>>>> >> > it for computing traffic distribution statistics.
> > >>>>> >> > * subtasks calculate local statistics and periodically send
> them
> > >>>>> to the
> > >>>>> >> > coordinator for global aggregation.
> > >>>>> >> > * The coordinator can broadcast the globally aggregated
> > >>>>> statistics to
> > >>>>> >> > subtasks, which can be used to guide the shuffling decision
> > >>>>> (selecting
> > >>>>> >> > downstream channels).
> > >>>>> >> >
> > >>>>> >> > Thanks,
> > >>>>> >> > Steven
> > >>>>> >> >
> > >>>>> >> >
> > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
> > >>>>> pnowoj...@apache.org>
> > >>>>> >> > wrote:
> > >>>>> >> >
> > >>>>> >> > > Hi,
> > >>>>> >> > >
> > >>>>> >> > > Could you clarify what's the proposal that you have in mind?
> > >>>>> From the
> > >>>>> >> > > context I would understand that the newly extracted
> > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
> > >>>>> `@PublicEvolving`
> > >>>>> >> or
> > >>>>> >> > > `@Experimental`, since otherwise extracting it and keeping
> > >>>>> `@Internal`
> > >>>>> >> > > wouldn't change much? Such `@Internal` base class could have
> > >>>>> been
> > >>>>> >> removed
> > >>>>> >> > > at any point of time in the future. Having said that, it
> > sounds
> > >>>>> to me
> > >>>>> >> > like
> > >>>>> >> > > your proposal is a bit bigger than it looks at the first
> > glance
> > >>>>> and
> > >>>>> >> you
> > >>>>> >> > > actually want to expose the operator coordinator concept to
> > the
> > >>>>> public
> > >>>>> >> > API?
> > >>>>> >> > >
> > >>>>> >> > > AFAIK there were some discussions about that, and it was a
> bit
> > >>>>> of a
> > >>>>> >> > > conscious decision to NOT do that. I don't know those
> reasons
> > >>>>> however.
> > >>>>> >> > Only
> > >>>>> >> > > now, I've just heard that there are for example some
> problems
> > >>>>> with
> > >>>>> >> > > checkpointing of hypothetical non source operator
> > coordinators.
> > >>>>> Maybe
> > >>>>> >> > > someone else could shed some light on this?
> > >>>>> >> > >
> > >>>>> >> > > Conceptually I would be actually in favour of exposing
> > operator
> > >>>>> >> > > coordinators if there is a good reason behind that, but it
> is
> > a
> > >>>>> more
> > >>>>> >> > > difficult topic and might be a larger effort than it seems
> at
> > >>>>> the
> > >>>>> >> first
> > >>>>> >> > > glance.
> > >>>>> >> > >
> > >>>>> >> > > Best,
> > >>>>> >> > > Piotrek
> > >>>>> >> > >
> > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <stevenz...@gmail.com>
> > >>>>> napisał(a):
> > >>>>> >> > >
> > >>>>> >> > > > Jing, thanks a lot for your reply. The linked google doc
> is
> > >>>>> not for
> > >>>>> >> > this
> > >>>>> >> > > > FLIP, which is fully documented in the wiki page. The
> linked
> > >>>>> google
> > >>>>> >> doc
> > >>>>> >> > > is
> > >>>>> >> > > > the design doc to introduce shuffling in Flink Iceberg
> sink,
> > >>>>> which
> > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
> coordinator
> > >>>>> can
> > >>>>> >> > leverage
> > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code
> > >>>>> duplication.
> > >>>>> >> > > >
> > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
> j...@ververica.com>
> > >>>>> wrote:
> > >>>>> >> > > >
> > >>>>> >> > > > > Thanks for bringing this up. It looks overall good! One
> > >>>>> small
> > >>>>> >> thing,
> > >>>>> >> > > you
> > >>>>> >> > > > > might want to write all content on the wiki page instead
> > of
> > >>>>> >> linking
> > >>>>> >> > to
> > >>>>> >> > > a
> > >>>>> >> > > > > google doc. The reason is that some people might not be
> > >>>>> able to
> > >>>>> >> > access
> > >>>>> >> > > > the
> > >>>>> >> > > > > google doc.
> > >>>>> >> > > > >
> > >>>>> >> > > > > Best regards,
> > >>>>> >> > > > > Jing
> > >>>>> >> > > > >
> > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
> > yegang...@gmail.com
> > >>>>> >
> > >>>>> >> wrote:
> > >>>>> >> > > > >
> > >>>>> >> > > > >> Hi,
> > >>>>> >> > > > >>
> > >>>>> >> > > > >> We submit the Flip proposal
> > >>>>> >> > > > >> <
> > >>>>> >> > > > >>
> > >>>>> >> > > >
> > >>>>> >> > >
> > >>>>> >> >
> > >>>>> >>
> > >>>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
> > >>>>> >> > > > >> >
> > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext from
> > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
> > >>>>> coordinators E.g.
> > >>>>> >> in
> > >>>>> >> > > the
> > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
> > >>>>> >> > > > >> <
> > >>>>> >> > > > >>
> > >>>>> >> > > >
> > >>>>> >> > >
> > >>>>> >> >
> > >>>>> >>
> > >>>>>
> >
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> > >>>>> >> > > > >> >
> > >>>>> >> > > > >>
> > >>>>> >> > > > >> Could you help to take a look?
> > >>>>> >> > > > >> Thanks
> > >>>>> >> > > > >>
> > >>>>> >> > > > >> Gang
> > >>>>> >> > > > >>
> > >>>>> >> > > > >
> > >>>>> >> > > >
> > >>>>> >> > >
> > >>>>> >> >
> > >>>>> >>
> > >>>>> >
> > >>>>>
> > >>>>
> >
>

Reply via email to