Hi Max,
I got your concern. Since shuffling support for Flink Iceberg sink is not
the main body of the proposal, I add another appendix part just now with
more details about how to use CoordinatorContextBase and how to define
ShufflingCoordinator.

Let me know if that cannot solve your concern.

Thanks
Gang

On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <m...@apache.org> wrote:

> Hey Gang,
>
> What I'm looking for here is a complete picture of why the change is
> necessary and what the next steps are. Ultimately, refactoring any code
> serves a purpose. Here, we want to refactor the Coordinator code such that
> we can add a SinkCoordinator, similar to the SourceCoordinator. The FLIP
> should address the next steps, i.e. how you plan to add the
> SinkCoordinator, its interfaces, runtime changes. It doesn't have to be in
> great detail but without this information, I don't think the FLIP is
> complete.
>
> This feature should be generic enough to be usable by other sinks than the
> Iceberg sink. Of course Iceberg can still load its own implementation which
> may be outlined in a separate FLIP.
>
> Unless there is a good reason, normal operators should not support the
> coordinator functionality. At least I'm not convinced it would play well
> with Flink's execution model. But I see how it is required for sources and
> sinks.
>
> -Max
>
> On Wed, Oct 26, 2022 at 3:05 PM gang ye <yegang...@gmail.com> wrote:
>
>> Hi Max,
>>
>> Thanks for reviewing.
>>
>> For this Flip 264, yes, we will only focus on abstracting RPC calls
>> between the task and the job manager for communications and won't touch
>> watermark checkpoint.
>> If the coordinator doesn't need RPC calls to talk with subtasks, then it
>> can define context without extending from the CoordinatorContextBase(or
>> find another class name to limit the scope).
>>
>> Regarding the code-changing scope, for this Flip 264, we will only do
>> context extraction. The shuffling coordinator and operator
>> <https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo>
>> which will use the context will come with a separate proposal, thus we try
>> to keep it simple in Flip 264 to understand. I can add a little bit more
>> about how to use the coordinator context in Flip 264 if you think that will
>> be helpful.
>>
>> Thanks!
>> Gang
>>
>>
>>
>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Thanks for the proposal, Gang! This is indeed somewhat of a bigger
>>> change. The coordinator for sources, as part of FLIP-27, was specifically
>>> added to synchronize the global watermark and to assign splits dynamically.
>>> However, it practically allows arbitrary RPC calls between the task and the
>>> job manager. I understand that there is concern that such a powerful
>>> mechanism should not be available to all operators. Nevertheless, I see the
>>> practical use in case of sinks like Iceberg. So I'd suggest limiting this
>>> feature to sinks (and sources) only.
>>>
>>> I'm wondering whether extracting the SourceCoordinatorContext is enough
>>> to achieve what you want. There will be additional work necessary, e.g.
>>> create a SinkCoordinator similarly to SourceCoordinator which handles the
>>> RPC calls and the checkpointing. I think it would be good to outline this
>>> in the FLIP.
>>>
>>> -Max
>>>
>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>>> sorry. sent the incomplete reply by mistake.
>>>>
>>>> If there are any concrete concerns, we can discuss. In the FLINK-27405
>>>> [1],
>>>> Avid pointed out some implications regarding checkpointing. In this
>>>> small
>>>> FLIP, we are not exposing/changing any checkpointing logic, we mainly
>>>> need
>>>> the coordinator context functionality to facilitate the communication
>>>> between coordinator and subtasks.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>>>>
>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>>
>>>> > Hang, appreciate your input. Agree that `CoordinatorContextBase` is a
>>>> > better name considering Flink code convention.
>>>> >
>>>> > If there are any concrete concerns, we can discuss. In the jira,
>>>> >
>>>> >
>>>> >
>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <ruanhang1...@gmail.com>
>>>> wrote:
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >> IMP, I agree to extract a base class for SourceCoordinatorContext.
>>>> >> But I prefer to use the name `OperatorCoordinatorContextBase` or
>>>> >> `CoordinatorContextBase` as the format like `SourceReaderBase`.
>>>> >> I also agree to what Piotr said. Maybe more problems will occur when
>>>> >> connectors start to use it.
>>>> >>
>>>> >> Best,
>>>> >> Hang
>>>> >>
>>>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五 22:31写道:
>>>> >>
>>>> >> > Piotr,
>>>> >> >
>>>> >> > The proposal is to extract the listed methods from @Iinternal
>>>> >> > SourceCoordinatorContext to a @PublicEvolving
>>>> BaseCoordinatorContext.
>>>> >> >
>>>> >> > The motivation is that other operators can leverage the
>>>> communication
>>>> >> > mechanism btw operator coordinator and operator subtasks. For
>>>> example,
>>>> >> in
>>>> >> > the linked google doc shuffle operator (in Flink Iceberg sink) can
>>>> >> leverage
>>>> >> > it for computing traffic distribution statistics.
>>>> >> > * subtasks calculate local statistics and periodically send them
>>>> to the
>>>> >> > coordinator for global aggregation.
>>>> >> > * The coordinator can broadcast the globally aggregated statistics
>>>> to
>>>> >> > subtasks, which can be used to guide the shuffling decision
>>>> (selecting
>>>> >> > downstream channels).
>>>> >> >
>>>> >> > Thanks,
>>>> >> > Steven
>>>> >> >
>>>> >> >
>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
>>>> pnowoj...@apache.org>
>>>> >> > wrote:
>>>> >> >
>>>> >> > > Hi,
>>>> >> > >
>>>> >> > > Could you clarify what's the proposal that you have in mind?
>>>> From the
>>>> >> > > context I would understand that the newly extracted
>>>> >> > > `BaseCoordinatorContext` would have to be marked as
>>>> `@PublicEvolving`
>>>> >> or
>>>> >> > > `@Experimental`, since otherwise extracting it and keeping
>>>> `@Internal`
>>>> >> > > wouldn't change much? Such `@Internal` base class could have been
>>>> >> removed
>>>> >> > > at any point of time in the future. Having said that, it sounds
>>>> to me
>>>> >> > like
>>>> >> > > your proposal is a bit bigger than it looks at the first glance
>>>> and
>>>> >> you
>>>> >> > > actually want to expose the operator coordinator concept to the
>>>> public
>>>> >> > API?
>>>> >> > >
>>>> >> > > AFAIK there were some discussions about that, and it was a bit
>>>> of a
>>>> >> > > conscious decision to NOT do that. I don't know those reasons
>>>> however.
>>>> >> > Only
>>>> >> > > now, I've just heard that there are for example some problems
>>>> with
>>>> >> > > checkpointing of hypothetical non source operator coordinators.
>>>> Maybe
>>>> >> > > someone else could shed some light on this?
>>>> >> > >
>>>> >> > > Conceptually I would be actually in favour of exposing operator
>>>> >> > > coordinators if there is a good reason behind that, but it is a
>>>> more
>>>> >> > > difficult topic and might be a larger effort than it seems at the
>>>> >> first
>>>> >> > > glance.
>>>> >> > >
>>>> >> > > Best,
>>>> >> > > Piotrek
>>>> >> > >
>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <stevenz...@gmail.com>
>>>> napisał(a):
>>>> >> > >
>>>> >> > > > Jing, thanks a lot for your reply. The linked google doc is
>>>> not for
>>>> >> > this
>>>> >> > > > FLIP, which is fully documented in the wiki page. The linked
>>>> google
>>>> >> doc
>>>> >> > > is
>>>> >> > > > the design doc to introduce shuffling in Flink Iceberg sink,
>>>> which
>>>> >> > > > motivated this FLIP proposal so that the shuffle coordinator
>>>> can
>>>> >> > leverage
>>>> >> > > > the introduced BaseCoordinatorContext to avoid code
>>>> duplication.
>>>> >> > > >
>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <j...@ververica.com>
>>>> wrote:
>>>> >> > > >
>>>> >> > > > > Thanks for bringing this up. It looks overall good! One small
>>>> >> thing,
>>>> >> > > you
>>>> >> > > > > might want to write all content on the wiki page instead of
>>>> >> linking
>>>> >> > to
>>>> >> > > a
>>>> >> > > > > google doc. The reason is that some people might not be able
>>>> to
>>>> >> > access
>>>> >> > > > the
>>>> >> > > > > google doc.
>>>> >> > > > >
>>>> >> > > > > Best regards,
>>>> >> > > > > Jing
>>>> >> > > > >
>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <yegang...@gmail.com>
>>>> >> wrote:
>>>> >> > > > >
>>>> >> > > > >> Hi,
>>>> >> > > > >>
>>>> >> > > > >> We submit the Flip proposal
>>>> >> > > > >> <
>>>> >> > > > >>
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>>>> >> > > > >> >
>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext from
>>>> >> > > > >> SourceCoordinatorContext to reuse it for other coordinators
>>>> E.g.
>>>> >> in
>>>> >> > > the
>>>> >> > > > >> shuffling support of Flink Iceberg sink
>>>> >> > > > >> <
>>>> >> > > > >>
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>>> >> > > > >> >
>>>> >> > > > >>
>>>> >> > > > >> Could you help to take a look?
>>>> >> > > > >> Thanks
>>>> >> > > > >>
>>>> >> > > > >> Gang
>>>> >> > > > >>
>>>> >> > > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> >
>>>>
>>>

Reply via email to