Option 1 sounds reasonable but I would be tempted to wait for a second 
motivational use case before generalizing the framework. However I wouldn’t 
oppose this extension if others feel it’s useful and good thing to do

Piotrek

> Wiadomość napisana przez Becket Qin <becket....@gmail.com> w dniu 06.05.2022, 
> o godz. 03:50:
> 
> I think the key point here is essentially what information should Flink
> expose to the user pluggables. Apparently split / local task watermark is
> something many user pluggables would be interested in. Right now it is
> calculated by the Flink framework but not exposed to the users space, i.e.
> SourceReader / SplitEnumerator. So it looks at least we can offer this
> information in some way so users can leverage that information to do
> things.
> 
> That said, I am not sure if this would help in the Iceberg alignment case.
> Because at this point, FLIP-182 reports source reader watermarks
> periodically, which may not align with the RequestSplitEvent. So if we
> really want to leverage the FLIP-182 mechanism here, I see a few ways, just
> to name two of them:
> 1. we can expose the source reader watermark in the SourceReaderContext, so
> the source readers can put the local watermark in a custom operator event.
> This will effectively bypass the existing RequestSplitEvent. Or we can also
> extend the RequestSplitEvent to add an additional info field of byte[]
> type, so users can piggy-back additional information there, be it watermark
> or other stuff.
> 2. Simply piggy-back the local watermark in the RequestSplitEvent and pass
> that info to the SplitEnumerator as well.
> 
> If we are going to do this, personally I'd prefer the first way, as it
> provides a mechanism to allow future extension. So it would be easier to
> expose other framework information to the user space in the future.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
>> On Fri, May 6, 2022 at 6:15 AM Thomas Weise <t...@apache.org> wrote:
>> 
>>> On Wed, May 4, 2022 at 11:03 AM Steven Wu <stevenz...@gmail.com> wrote:
>>> Any opinion on different timestamp for source alignment (vs Flink
>> application watermark)? For Iceberg source, we might want to enforce
>> alignment on kafka timestamp but Flink application watermark may use event
>> time field from payload.
>> 
>> I imagine that more generally the question is alignment based on the
>> iceberg partition/file metadata vs. individual rows? I think that
>> should work as long as there is a guarantee for out of orderness
>> within the split?
>> 
>> Thomas
>> 
>>> 
>>> Thanks,
>>> Steven
>>> 
>>> On Wed, May 4, 2022 at 7:02 AM Becket Qin <becket....@gmail.com> wrote:
>>>> 
>>>> Hey Piotr,
>>>> 
>>>> I think the mechanism FLIP-182 provided is a reasonable default one,
>> which
>>>> ensures the watermarks are only drifted by an upper bound. However,
>>>> admittedly there are also other strategies for different purposes.
>>>> 
>>>> In the Iceberg case, I am not sure if a static strictly allowed
>> watermark
>>>> drift is desired. The source might just want to finish reading the
>> assigned
>>>> splits as fast as possible. And it is OK to have a drift of "one split",
>>>> instead of a fixed time period.
>>>> 
>>>> As another example, if there are some fast readers whose splits are
>> always
>>>> throttled, while the other slow readers are struggling to keep up with
>> the
>>>> rest of the splits, the split enumerator may decide to reassign the slow
>>>> splits so all the readers have something to read. This would need the
>>>> SplitEnumerator to be aware of the watermark progress on each reader.
>> So it
>>>> seems useful to expose the WatermarkAlignmentEvent information to the
>>>> SplitEnumerator as well.
>>>> 
>>>> Thanks,
>>>> 
>>>> Jiangjie (Becket) Qin
>>>> 
>>>> 
>>>> 
>>>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>>> 
>>>>> Hi Steven,
>>>>> 
>>>>> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
>> emit
>>>>> all splits and let FLIP-182/FLIP-217 handle the watermark alignment
>> and
>>>>> block the splits that are too much into the future? I can see this
>> being an
>>>>> issue if the existence of too many blocked splits is occupying too
>> many
>>>>> resources.
>>>>> 
>>>>> If that's the case, indeed SourceCoordinator/SplitEnumerator would
>> have to
>>>>> decide on some basis how many and which splits to assign in what
>> order. But
>>>>> in that case I'm not sure how much you could use from FLIP-182 and
>>>>> FLIP-217. They seem somehow orthogonal to me, operating on different
>>>>> levels. FLIP-182 and FLIP-217 are working with whatever splits have
>> already
>>>>> been generated and assigned. You could leverage FLIP-182 and FLIP-217
>> and
>>>>> take care of only the problem to limit the number of parallel active
>>>>> splits. And here I'm not sure if it would be worth generalising a
>> solution
>>>>> across different connectors.
>>>>> 
>>>>> Regarding the global watermark, I made a related comment sometime ago
>>>>> about it [1]. It sounds to me like you also need to solve this
>> problem,
>>>>> otherwise Iceberg users will encounter late records in case of some
>> race
>>>>> conditions between assigning new splits and completions of older.
>>>>> 
>>>>> Best,
>>>>> Piotrek
>>>>> 
>>>>> [1]
>>>>> 
>> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>>>>> 
>>>>> pon., 2 maj 2022 o 04:26 Steven Wu <stevenz...@gmail.com> napisał(a):
>>>>> 
>>>>>> add dev@ group to the thread as Thomas suggested
>>>>>> 
>>>>>> Arvid,
>>>>>> 
>>>>>> The scenario 3 (Dynamic assignment + temporary no split) in the
>> FLIP-180
>>>>>> (idleness) can happen to Iceberg source alignment, as readers can be
>>>>>> temporarily starved due to the holdback by the enumerator when
>> assigning
>>>>>> new splits upon request.
>>>>>> 
>>>>>> Totally agree that we should decouple this discussion with the
>> FLIP-217,
>>>>>> which addresses the split level watermark alignment problem as a
>> follow-up
>>>>>> of FLIP-182
>>>>>> 
>>>>>> Becket,
>>>>>> 
>>>>>> Yes, currently Iceberg source implemented the alignment leveraging
>> the
>>>>>> dynamic split assignment from FLIP-27 design. Basically, the
>> enumerator
>>>>>> can
>>>>>> hold back split assignments to readers when necessary. Everything are
>>>>>> centralized in the enumerator: (1) watermark extraction and
>> aggregation
>>>>>> (2)
>>>>>> alignment decision and execution
>>>>>> 
>>>>>> The motivation of this discussion is to see if Iceberg source can
>> leverage
>>>>>> some of the watermark alignment solutions (like FLIP-182) from Flink
>>>>>> framework. E.g., as mentioned in the doc, Iceberg source can
>> potentially
>>>>>> leverage the FLIP-182 framework to do the watermark extraction and
>>>>>> aggregation. For the alignment decision and execution, we can keep
>> them in
>>>>>> the centralized enumerator.
>>>>>> 
>>>>>> Thanks,
>>>>>> Steven
>>>>>> 
>>>>>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <becket....@gmail.com>
>> wrote:
>>>>>> 
>>>>>>> Hi Steven,
>>>>>>> 
>>>>>>> Thanks for pulling me into this thread. I think the timestamp
>>>>>>> alignment use case here is a good example of what FLIP-27 was
>> designed
>>>>>> for.
>>>>>>> 
>>>>>>> Technically speaking, Iceberg source can already implement the
>> timestamp
>>>>>>> alignment in the Flink new source even without FLIP-182. However, I
>>>>>>> understand the rationale here because timestamp alignment is also
>>>>>> trying to
>>>>>>> orchestrate the consumption of splits. However, it looks like
>> FLIP-182
>>>>>> was
>>>>>>> not designed in a way that it can be easily extended for other use
>>>>>> cases.
>>>>>>> It may probably worth thinking of a more general mechanism to
>> answer the
>>>>>>> following questions:
>>>>>>> 
>>>>>>> 1. What information whose source of truth is the Flink framework
>> should
>>>>>> be
>>>>>>> exposed to the SplitEnumerator and SourceReader? And how?
>>>>>>> 2. What control actions in the Flink framework are worth exposing
>> to the
>>>>>>> SplitEnumerators and SourceReaders? And how?
>>>>>>> 
>>>>>>> In the context of timestamp alignment, the first question is more
>>>>>>> relevant. For example, instead of hardcode the ReportWatermarkEvent
>>>>>>> handling logic in the SourceCoordinator, should we expose this to
>> the
>>>>>>> SplitEnumerator? So basically there will be some information, such
>> as
>>>>>>> subtask local watermark, whose source of truth is Flink runtime,
>> but
>>>>>> useful
>>>>>>> to the user provided pluggables.
>>>>>>> 
>>>>>>> I think there are a few control flow patterns to make a complete
>> design:
>>>>>>> 
>>>>>>> a. Framework space information (e.g. watermark) --> User space
>>>>>> Pluggables
>>>>>>> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
>>>>>> split).
>>>>>>> b. Framework space information (e.g. task failure) --> User space
>>>>>>> pluggables (e.g. SplitEnumerator) --> Framework space actions
>> (e.g. exit
>>>>>>> the job)
>>>>>>> c. User space information (e.g. a custom workload metric) --> User
>> space
>>>>>>> pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
>> rebalance
>>>>>>> the workload across the source readers).
>>>>>>> d. Use space information (e.g. a custom stopping event in the
>> stream)
>>>>>> -->
>>>>>>> User space pluggables (e.g. SplitEnumerator) --> Framework space
>> actions
>>>>>>> (e.g. stop the job).
>>>>>>> 
>>>>>>> So basically for any user provided pluggables, the input
>> information may
>>>>>>> either come from another user provided logic or from the
>> framework, and
>>>>>>> after receiving that information, the pluggable may either want the
>>>>>>> framework or another pluggable to take an action. So this gives the
>>>>>> above 4
>>>>>>> combinations.
>>>>>>> 
>>>>>>> In our case, when the pluggables are SplitEnumerator and
>> SourceReader,
>>>>>> the
>>>>>>> control flows that only involve user space actions are fully
>> supported.
>>>>>> But
>>>>>>> it seems that when it comes to control flows involving framework
>> space
>>>>>>> information, some of the information has not been exposed to the
>>>>>> pluggable,
>>>>>>> and some framework actions might also be missing.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jiangjie (Becket) Qin
>>>>>>> 
>>>>>>> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
>> wrote:
>>>>>>> 
>>>>>>>> Hi folks,
>>>>>>>> 
>>>>>>>> quick input from my side. I think this is from the implementation
>>>>>>>> perspective what Piotr and I had in mind for a global min
>> watermark
>>>>>> that
>>>>>>>> helps in idleness cases. See also point 3 in
>>>>>>>> 
>>>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>>>>>>>> .
>>>>>>>> 
>>>>>>>> Basically, we would like to empower source enumerators to
>> determine the
>>>>>>>> global min watermark for all source readers factoring in even
>> future
>>>>>>>> splits. Not all sources can supply that information (think of a
>> general
>>>>>>>> file source) but most should be able to. Basically, Flink should
>> know
>>>>>> for a
>>>>>>>> given source at a given point in time what the min watermark
>> across all
>>>>>>>> source subtasks is.
>>>>>>>> 
>>>>>>>> Here is some background:
>>>>>>>> In the context of idleness, we can deterministically advance the
>>>>>>>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>>>>>> sources
>>>>>>>> to switch to idleness and thus allow watermarks to increase in
>> cases
>>>>>> where
>>>>>>>> fewer splits than source tasks are available. However, for
>> sources with
>>>>>>>> dynamic split discovery that actually yields incorrect results.
>> Think
>>>>>> of a
>>>>>>>> Kinesis consumer where a shard is split. Then a previously idle
>> source
>>>>>>>> subtask may receive a new split with time t0 as the lowest
>> timestamp.
>>>>>> Since
>>>>>>>> the source subtask did not participate in the global watermark
>>>>>> generation
>>>>>>>> (because it was idle), the previously emitted watermark may be
>> past t0
>>>>>> and
>>>>>>>> thus results in late records potentially being discarded. A rerun
>> of
>>>>>> the
>>>>>>>> same pipeline on historic data would not render the source subtask
>>>>>> idle and
>>>>>>>> not result in late records. The solution was to not render source
>>>>>> subtasks
>>>>>>>> automatically idle by the framework if there are no spits. That
>> leads
>>>>>> to
>>>>>>>> confusion for Kafka users with static topic subscription where
>> #splits
>>>>>> <
>>>>>>>> #parallelism stalls pipelines because the watermark is not
>> advancing.
>>>>>> Here,
>>>>>>>> your sketched solution can be transferred to KafkaSource to let
>> Flink
>>>>>> know
>>>>>>>> that min global watermark on a static assignment is determined by
>> the
>>>>>>>> slowest partition. Hence, all idle readers emit that min global
>>>>>> watermark
>>>>>>>> and the user sees progress.
>>>>>>>> This whole idea is related to FLIP-182 watermark alignment but
>> I'd go
>>>>>>>> with another FLIP as the goal is quite different even though the
>>>>>>>> implementation overlaps.
>>>>>>>> 
>>>>>>>> Now Iceberg seems to use the same information to actually pause
>> the
>>>>>>>> consumption of files and create some kind of orderness guarantees
>> as
>>>>>> far as
>>>>>>>> I understood. This probably can be applied to any source with
>> dynamic
>>>>>> split
>>>>>>>> discovery. However, I wouldn't mix up the concepts and hence I
>>>>>> appreciate
>>>>>>>> you not chiming into the FLIP-182 and ff. threads. The goal of
>>>>>> FLIP-182 is
>>>>>>>> to pause readers while consuming a split, while your approach
>> pauses
>>>>>>>> readers before processing another split. So it feels more closely
>>>>>> related
>>>>>>>> to the global min watermark - so it could either be part of that
>> FLIP
>>>>>> or a
>>>>>>>> FLIP of its own. Afaik API changes should actually happen only on
>> the
>>>>>>>> enumerator side both for your ideas and for global min watermark.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> 
>>>>>>>> Arvid
>>>>>>>> 
>>>>>>>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <t...@apache.org>
>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Steven,
>>>>>>>>> 
>>>>>>>>> Would it be better to bring this as a separate thread related to
>>>>>> Iceberg
>>>>>>>>> source to the dev@ list? I think this could benefit from broader
>>>>>> input?
>>>>>>>>> 
>>>>>>>>> Thanks
>>>>>>>>> 
>>>>>>>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz...@gmail.com>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> + Becket and Sebastian
>>>>>>>>>> 
>>>>>>>>>> It is also related to the split level watermark alignment
>> discussion
>>>>>>>>>> thread. Because it is already very long, I don't want to further
>>>>>> complicate
>>>>>>>>>> the ongoing discussion there. But I can move the discussion to
>> that
>>>>>>>>>> existing thread if that is preferred.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <
>> stevenz...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi all,
>>>>>>>>>>> 
>>>>>>>>>>> We are thinking about how to align with the Flink community and
>>>>>>>>>>> leverage the FLIP-182 watermark alignment in the Iceberg
>> source. I
>>>>>> put some
>>>>>>>>>>> context in this google doc. Would love to get hear your
>> thoughts on
>>>>>> this.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>> 
>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Steven
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> 
>>>>> 
>> 

Reply via email to