Hi Max,

Thanks for reviewing.

For this Flip 264, yes, we will only focus on abstracting RPC calls between
the task and the job manager for communications and won't touch watermark
checkpoint.
If the coordinator doesn't need RPC calls to talk with subtasks, then it
can define context without extending from the CoordinatorContextBase(or
find another class name to limit the scope).

Regarding the code-changing scope, for this Flip 264, we will only do
context extraction. The shuffling coordinator and operator
<https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo>
which will use the context will come with a separate proposal, thus we try
to keep it simple in Flip 264 to understand. I can add a little bit more
about how to use the coordinator context in Flip 264 if you think that will
be helpful.

Thanks!
Gang



On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <m...@apache.org> wrote:

> Thanks for the proposal, Gang! This is indeed somewhat of a bigger change.
> The coordinator for sources, as part of FLIP-27, was specifically added to
> synchronize the global watermark and to assign splits dynamically. However,
> it practically allows arbitrary RPC calls between the task and the job
> manager. I understand that there is concern that such a powerful mechanism
> should not be available to all operators. Nevertheless, I see the practical
> use in case of sinks like Iceberg. So I'd suggest limiting this feature to
> sinks (and sources) only.
>
> I'm wondering whether extracting the SourceCoordinatorContext is enough
> to achieve what you want. There will be additional work necessary, e.g.
> create a SinkCoordinator similarly to SourceCoordinator which handles the
> RPC calls and the checkpointing. I think it would be good to outline this
> in the FLIP.
>
> -Max
>
> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <stevenz...@gmail.com> wrote:
>
>> sorry. sent the incomplete reply by mistake.
>>
>> If there are any concrete concerns, we can discuss. In the FLINK-27405
>> [1],
>> Avid pointed out some implications regarding checkpointing. In this small
>> FLIP, we are not exposing/changing any checkpointing logic, we mainly need
>> the coordinator context functionality to facilitate the communication
>> between coordinator and subtasks.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>>
>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <stevenz...@gmail.com> wrote:
>>
>> > Hang, appreciate your input. Agree that `CoordinatorContextBase` is a
>> > better name considering Flink code convention.
>> >
>> > If there are any concrete concerns, we can discuss. In the jira,
>> >
>> >
>> >
>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <ruanhang1...@gmail.com>
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> IMP, I agree to extract a base class for SourceCoordinatorContext.
>> >> But I prefer to use the name `OperatorCoordinatorContextBase` or
>> >> `CoordinatorContextBase` as the format like `SourceReaderBase`.
>> >> I also agree to what Piotr said. Maybe more problems will occur when
>> >> connectors start to use it.
>> >>
>> >> Best,
>> >> Hang
>> >>
>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五 22:31写道:
>> >>
>> >> > Piotr,
>> >> >
>> >> > The proposal is to extract the listed methods from @Iinternal
>> >> > SourceCoordinatorContext to a @PublicEvolving BaseCoordinatorContext.
>> >> >
>> >> > The motivation is that other operators can leverage the communication
>> >> > mechanism btw operator coordinator and operator subtasks. For
>> example,
>> >> in
>> >> > the linked google doc shuffle operator (in Flink Iceberg sink) can
>> >> leverage
>> >> > it for computing traffic distribution statistics.
>> >> > * subtasks calculate local statistics and periodically send them to
>> the
>> >> > coordinator for global aggregation.
>> >> > * The coordinator can broadcast the globally aggregated statistics to
>> >> > subtasks, which can be used to guide the shuffling decision
>> (selecting
>> >> > downstream channels).
>> >> >
>> >> > Thanks,
>> >> > Steven
>> >> >
>> >> >
>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <pnowoj...@apache.org
>> >
>> >> > wrote:
>> >> >
>> >> > > Hi,
>> >> > >
>> >> > > Could you clarify what's the proposal that you have in mind? From
>> the
>> >> > > context I would understand that the newly extracted
>> >> > > `BaseCoordinatorContext` would have to be marked as
>> `@PublicEvolving`
>> >> or
>> >> > > `@Experimental`, since otherwise extracting it and keeping
>> `@Internal`
>> >> > > wouldn't change much? Such `@Internal` base class could have been
>> >> removed
>> >> > > at any point of time in the future. Having said that, it sounds to
>> me
>> >> > like
>> >> > > your proposal is a bit bigger than it looks at the first glance and
>> >> you
>> >> > > actually want to expose the operator coordinator concept to the
>> public
>> >> > API?
>> >> > >
>> >> > > AFAIK there were some discussions about that, and it was a bit of a
>> >> > > conscious decision to NOT do that. I don't know those reasons
>> however.
>> >> > Only
>> >> > > now, I've just heard that there are for example some problems with
>> >> > > checkpointing of hypothetical non source operator coordinators.
>> Maybe
>> >> > > someone else could shed some light on this?
>> >> > >
>> >> > > Conceptually I would be actually in favour of exposing operator
>> >> > > coordinators if there is a good reason behind that, but it is a
>> more
>> >> > > difficult topic and might be a larger effort than it seems at the
>> >> first
>> >> > > glance.
>> >> > >
>> >> > > Best,
>> >> > > Piotrek
>> >> > >
>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <stevenz...@gmail.com>
>> napisał(a):
>> >> > >
>> >> > > > Jing, thanks a lot for your reply. The linked google doc is not
>> for
>> >> > this
>> >> > > > FLIP, which is fully documented in the wiki page. The linked
>> google
>> >> doc
>> >> > > is
>> >> > > > the design doc to introduce shuffling in Flink Iceberg sink,
>> which
>> >> > > > motivated this FLIP proposal so that the shuffle coordinator can
>> >> > leverage
>> >> > > > the introduced BaseCoordinatorContext to avoid code duplication.
>> >> > > >
>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <j...@ververica.com>
>> wrote:
>> >> > > >
>> >> > > > > Thanks for bringing this up. It looks overall good! One small
>> >> thing,
>> >> > > you
>> >> > > > > might want to write all content on the wiki page instead of
>> >> linking
>> >> > to
>> >> > > a
>> >> > > > > google doc. The reason is that some people might not be able to
>> >> > access
>> >> > > > the
>> >> > > > > google doc.
>> >> > > > >
>> >> > > > > Best regards,
>> >> > > > > Jing
>> >> > > > >
>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <yegang...@gmail.com>
>> >> wrote:
>> >> > > > >
>> >> > > > >> Hi,
>> >> > > > >>
>> >> > > > >> We submit the Flip proposal
>> >> > > > >> <
>> >> > > > >>
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>> >> > > > >> >
>> >> > > > >> at Confluent to extract BaseCoordinatorContext from
>> >> > > > >> SourceCoordinatorContext to reuse it for other coordinators
>> E.g.
>> >> in
>> >> > > the
>> >> > > > >> shuffling support of Flink Iceberg sink
>> >> > > > >> <
>> >> > > > >>
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>> >> > > > >> >
>> >> > > > >>
>> >> > > > >> Could you help to take a look?
>> >> > > > >> Thanks
>> >> > > > >>
>> >> > > > >> Gang
>> >> > > > >>
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >
>>
>

Reply via email to