Hi all,

I would like to share some updates on FLIP-327. Dong and I have had a
series of discussions and have made several refinements to the FLIP.

The major change to the FLIP is to allow the input of the one-input
operator to be automatically sorted during backlog processing. When
combined with the state backend optimization introduced in FLIP-325 [1],
all the keyed single-input operators can achieve similar performance as in
batch mode during backlog processing without any code change to the
operator. We also implemented a POC[2] and conducted benchmark[3] using the
KeyedStream#reduce operation. The benchmark results demonstrate the
performance gains that this FLIP can offer.

I am looking forward to any comments or feedback you may have on this FLIP.

Best,
Xuannan

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access
[2] https://github.com/Sxnan/flink/tree/FLIP-327-demo
[3]
https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java



> On Aug 18, 2023, at 21:28, Dong Lin <lindon...@gmail.com> wrote:
>
> Hi Piotr,
>
> Thanks for the explanation.
>
> To recap our offline discussion, there is a concern regarding the
> capability to dynamically switch between stream and batch modes. This
> concern is around unforeseen behaviors such as bugs or performance
> regressions, which we might not yet be aware of yet. The reason for this
> concern is that this feature involves a fundamental impact on the Flink
> runtime's behavior.
>
> Due to the above concern, I agree it is reasonable to annotate related
APIs
> as experimental. This step would provide us with the flexibility to modify
> these APIs if issues arise in the future. This annotation also serves as a
> note to users that this functionality might not perform well as expected.
>
> Though I believe that we can ensure the reliability of this feature
through
> good design and code reviews, comprehensive unit tests, and thorough
> integration testing, I agree that it is reasonable to be extra cautious in
> this case. Also, it should be OK to delay making these APIs as
> non-experimental by 1-2 releases.
>
> I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in these docs
> as experimental. Please let me know if you think any other API should also
> be marked as experimental.
>
> Thanks!
> Dong
>
> On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
> wrote:
>
>> Hi Dong,
>>
>> Operators API is unfortunately also our public facing API and I mean the
>> APIs that we will add there should also be marked `@Experimental` IMO.
>>
>> The config options should also be marked as experimental (both
>> annotated @Experimental and noted the same thing in the docs,
>> if @Experimental annotation is not automatically mentioned in the docs).
>>
>>> Alternatively, how about we add a doc for
>> checkpointing.interval-during-backlog explaining its impact/concern as
>> discussed above?
>>
>> We should do this independently from marking the APIs/config options as
>> `@Experimental`
>>
>> Best,
>> Piotrek
>>
>> pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com> napisał(a):
>>
>>> Hi Piotr,
>>>
>>> Thanks for the reply!
>>>
>>> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski <piotr.nowoj...@gmail.com
>>>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Sorry for the long delay in responding!
>>>>
>>>>> Given that it is an optional feature that can be
>>>>> turned off by users, it might be OK to just let users try it out and
>> we
>>>> can
>>>>> fix performance issues once we detect any of them. What do you think?
>>>>
>>>> I think it's fine. It would be best to mark this feature as
>> experimental,
>>>> and
>>>> we say that the config keys or the default values might change in the
>>>> future.
>>>>
>>>
>>> In general I agree we can mark APIs that determine "whether to enable
>>> dynamic switching between stream/batch mode" as experimental.
>>>
>>> However, I am not sure we have such an API yet. The APIs added in this
>> FLIP
>>> are intended to be used by operator developers rather than end users.
End
>>> users can enable this capability by setting
>>> execution.checkpointing.interval-during-backlog = Long.MAX and uses a
>>> source which might implicitly set backlog statu (e.g. HybridSource). So
>>> execution.checkpointing.interval-during-backlog is the only user-facing
>>> APIs that can always control whether this feature can be used.
>>>
>>> However, execution.checkpointing.interval-during-backlog itself is not
>> tied
>>> to FLIP-327.
>>>
>>> Do you mean we should set checkpointing.interval-during-backlog as
>>> experimental? Alternatively, how about we add a doc for
>>> checkpointing.interval-during-backlog explaining its impact/concern as
>>> discussed above?
>>>
>>> Best,
>>> Dong
>>>
>>>
>>>>> Maybe we can revisit the need for such a config when we
>>> introduce/discuss
>>>>> the capability to switch backlog from false to true in the future.
>> What
>>>> do
>>>>> you think?
>>>>
>>>> Sure, we can do that.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com> napisał(a):
>>>>
>>>>> Hi Piotr,
>>>>>
>>>>> Thanks a lot for the explanation. Please see my reply inline.
>>>>>
>>>>> On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
>>>> piotr.nowoj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Dong,
>>>>>>
>>>>>> Thanks a lot for the answers. I can now only briefly answer your
>> last
>>>>>> email.
>>>>>>
>>>>>>> It is possible that spilling to disks might cause larger
>> overhead.
>>>> IMO
>>>>> it
>>>>>>> is an orthogonal issue already existing in Flink. This is
>> because a
>>>>> Flink
>>>>>>> job running batch mode might also be slower than its throughput
>> in
>>>>> stream
>>>>>>> mode due to the same reason.
>>>>>>
>>>>>> Yes, I know, but the thing that worries me is that previously only
>> a
>>>> user
>>>>>> alone
>>>>>> could decide whether to use batch mode or streaming, and in
>> practice
>>>> one
>>>>>> user would rarely (if ever) use both for the same
>> problem/job/query.
>>> If
>>>>> his
>>>>>> intention was to eventually process live data, he was using
>> streaming
>>>>> even
>>>>>> if there was a large backlog at the start (apart of some very few
>>> very
>>>>>> power
>>>>>> users).
>>>>>>
>>>>>> With this change, we want to introduce a mode that would be
>> switching
>>>>> back
>>>>>> and forth between streaming and "batch in streaming" automatically.
>>> So
>>>> a
>>>>>> potential performance regression would be much more visible and
>>> painful
>>>>>> at the same time. If batch query runs slower then it could, it's
>> kind
>>>> of
>>>>>> fine as
>>>>>> it will end at some point. If streaming query during large back
>>>> pressure
>>>>>> maybe
>>>>>> temporary load spike switches to batch processing, that's a bigger
>>>> deal.
>>>>>> Especially if batch processing mode will not be able to actually
>> even
>>>>>> handle
>>>>>> the normal load, after the load spike. In that case, the job could
>>>> never
>>>>>> recover
>>>>>> from the backpressure/backlog mode.
>>>>>>
>>>>>
>>>>> I understand you are concerned with the risk of performance
>> regression
>>>>> introduced due to switching to batch mode.
>>>>>
>>>>> After thinking about this more, I think this existing proposal meets
>>> the
>>>>> minimum requirement of "not introducing regression for existing
>> jobs".
>>>> The
>>>>> reason is that even if batch mode can be slower than stream mode for
>>> some
>>>>> operators in some cases, this is an optional feature that will only
>> be
>>>>> enabled if a user explicitly overrides the newly introduced config to
>>>>> non-default values. Existing jobs that simply upgrade their Flink
>>> library
>>>>> version will not suffer any performance regression.
>>>>>
>>>>> More specifically, in order to switch to batch mode, users will need
>> to
>>>>> explicitly set execution.checkpointing.interval-during-backlog to 0.
>>> And
>>>>> users can always explicitly update
>>>>> execution.checkpointing.interval-during-backlog to turn off the batch
>>>> mode
>>>>> if that incurs any performance issue.
>>>>>
>>>>> As far as I can tell, for all practical workloads we see in
>> production
>>>>> jobs, batch mode is always faster (w.r.t. throughput) than stream
>> mode
>>>> when
>>>>> there is a high backlog of incoming records. Though it is still
>>>>> theoretically possible, it should be very rare (if any) for batch
>> mode
>>> to
>>>>> be slower in practice. Given that it is an optional feature that can
>> be
>>>>> turned off by users, it might be OK to just let users try it out and
>> we
>>>> can
>>>>> fix performance issues once we detect any of them. What do you think?
>>>>>
>>>>>
>>>>>>
>>>>>>> execution.backlog.use-full-batch-mode-on-start (default false)
>>>>>>
>>>>>> ops sorry, it was supposed to be sth like:
>>>>>>
>>>>>> execution.backlog.use-batch-mode-only-on-start (default false)
>>>>>>
>>>>>> That option would disallow switching from streaming to batch. Batch
>>>> mode
>>>>>> would be allowed only to get rid of the initial, present on
>> start-up
>>>>>> backlog.
>>>>>>
>>>>>> Would allow us to safely experiment with switching from streaming
>> to
>>>>> batch
>>>>>> and I would be actually more fine in enabling "using batch mode on
>>>> start"
>>>>>> by default, until we gain confidence and feedback that switching
>>> back &
>>>>>> forth
>>>>>> is working as expected.
>>>>>>
>>>>>
>>>>> Now I understand what you are suggesting. I agree that it is
>> necessary
>>>> for
>>>>> users to be able to disallow switching from streaming to batch.
>>>>>
>>>>> I am not sure it is necessary to introduce an extra config just for
>>> this
>>>>> purpose. The reason is that we don't have any strategy that switches
>>>>> backlog status from false to true yet. And when we have such strategy
>>>> (e.g.
>>>>> FLIP-328) in the future, it is very likely that we will introduce
>> extra
>>>>> config(s) for users to explicitly turn on such a feature. That means
>>> user
>>>>> should be able to turn off this feature even if we don't have
>> something
>>>>> like execution.backlog.use-batch-mode-only-on-start.
>>>>>
>>>>> Maybe we can revisit the need for such a config when we
>>> introduce/discuss
>>>>> the capability to switch backlog from false to true in the future.
>> What
>>>> do
>>>>> you think?
>>>>>
>>>>>
>>>>>>
>>>>>>>> Or we could limit the scope of this FLIP to only support
>> starting
>>>> with
>>>>>>>> batch mode and switching only once to
>>>>>>>> streaming, and design a follow up with switching back and forth?
>>>>>>>
>>>>>>> Sure, that sounds good to me. I am happy to split this FLIP into
>>> two
>>>>>> FLIPs
>>>>>>> so that we can make incremental progress.
>>>>>>
>>>>>> Great, let's do that. In a follow up FLIP we could restart the
>>>> discussion
>>>>>> about
>>>>>> switching back and forth.
>>>>>>
>>>>>
>>>>> Cool, I added the following statement to the motivation section.
>>>>>
>>>>> "NOTE: this FLIP focuses only on the capability to switch from batch
>> to
>>>>> stream mode. If there is any extra API needed to support switching
>> from
>>>>> stream to batch mode, we will discuss them in a follow-up FLIP."
>>>>>
>>>>> I am looking forward to reading your follow-up thoughts!
>>>>>
>>>>> Best,
>>>>> Dong
>>>>>
>>>>>
>>>>>> Piotrek
>>>>>>
>>>>>> czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com>
>> napisał(a):
>>>>>>
>>>>>>> Hi Piotr,
>>>>>>>
>>>>>>> Thank you for the very detailed comments! Please see my reply
>>> inline.
>>>>>>>
>>>>>>> On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski <
>>>>>> piotr.nowoj...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dong,
>>>>>>>>
>>>>>>>> I have a couple of follow up questions about switching back and
>>>> forth
>>>>>>>> between streaming and batching mode.
>>>>>>>> Especially around shuffle/watermark strategy, and keyed state
>>>>> backend.
>>>>>>>>
>>>>>>>> First of all, it might not always be beneficial to switch into
>>> the
>>>>>> batch
>>>>>>>> modes:
>>>>>>>> - Shuffle strategy
>>>>>>>>    - Is sorting going to be purely in-memory? If not,
>> obviously
>>>>>> spilling
>>>>>>>> to disks might cause larger overheads
>>>>>>>>       compared to not sorting the records.
>>>>>>>>
>>>>>>>
>>>>>>> Sorting might require spilling data to disk depending on the
>> input
>>>>> size.
>>>>>>> The behavior of sorting w.r.t. memory/disk is expected to be
>>> exactly
>>>>> the
>>>>>>> same as the behavior of input sorting automatically performed by
>>>> Flink
>>>>>>> runtime in batch mode for keyed inputs.
>>>>>>>
>>>>>>> More specifically, ExternalSorter
>>>>>>> <
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java
>>>>>>>>
>>>>>>> is
>>>>>>> currently used to sort keyed inputs in batch mode. It is
>>>> automatically
>>>>>> used
>>>>>>> by Flink runtime in OneInputStreamTask (here
>>>>>>> <
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114
>>>>>>>> )
>>>>>>> and in MultiInputSortingDataInput (here
>>>>>>> <
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188
>>>>>>>> ).
>>>>>>> We plan to re-use the same code/mechanism to do sorting.
>>>>>>>
>>>>>>> It is possible that spilling to disks might cause larger
>> overhead.
>>>> IMO
>>>>> it
>>>>>>> is an orthogonal issue already existing in Flink. This is
>> because a
>>>>> Flink
>>>>>>> job running batch mode might also be slower than its throughput
>> in
>>>>> stream
>>>>>>> mode due to the same reason. However, even though it is possible
>> in
>>>>>> theory,
>>>>>>> I expect that in practice the throughput of using sorting +
>>>>>>> BatchExecutionKeyedStateBackend should be much higher than using
>>>> other
>>>>>>> keyed statebackends when the amount of data is large. As a matter
>>> of
>>>>>> fact,
>>>>>>> we have not heard of complaints of such performance regression
>>> issues
>>>>> in
>>>>>>> batch mode.
>>>>>>>
>>>>>>> The primary goal of this FLIP is to allow the operator to run at
>>> the
>>>>> same
>>>>>>> throughput (in stream mode when there is backlog) as it can
>>> currently
>>>>> do
>>>>>> in
>>>>>>> batch mode. And this goal is not affected by the disk overhead
>>> issue
>>>>>>> mentioned above.
>>>>>>>
>>>>>>> I am thinking maybe we can treat it as an orthogonal performance
>>>>>>> optimization problem instead of solving this problem in this
>> FLIP?
>>>>>>>
>>>>>>>    - If it will be at least partially in-memory, does Flink have
>>>> some
>>>>>>>> mechanism to reserve optional memory that
>>>>>>>>      can be revoked if a new operator starts up? Can this
>> memory
>>>> be
>>>>>>>> redistributed? Ideally we should use as
>>>>>>>>      much as possible of the available memory to avoid
>> spilling
>>>>> costs,
>>>>>>> but
>>>>>>>> also being able to revoke that memory
>>>>>>>>
>>>>>>>
>>>>>>> This FLIP does not support dynamically revoking/redistribuitng
>>>> managed
>>>>>>> memory used by the ExternalSorter.
>>>>>>>
>>>>>>> For operators with isInternalSorterSupported = true, we will
>>> allocate
>>>>> to
>>>>>>> this operator execution.sorted-inputs.memory
>>>>>>> <
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144
>>>>>>>>
>>>>>>> amount of managed memory. This is the same as how Flink allocates
>>>>> managed
>>>>>>> memory to an operator when this operator has keyed inputs in
>> batch
>>>>> mode.
>>>>>>>
>>>>>>> Note that this FLIP intends to support operators to sort inputs
>>>>> whenever
>>>>>>> there is backlog. And there is currently no way for an operator
>> to
>>>> know
>>>>>> in
>>>>>>> advance whether there will be no backlog after a given time. So
>> it
>>>>> seems
>>>>>>> simpler to just keep managed memory for such an operator
>> throughout
>>>> the
>>>>>>> lifecycle of this operator, for now.
>>>>>>>
>>>>>>> Besides, it seems that the lack of ability to dynamically
>>>>>>> revoke/redistribute un-used managed memory is an existing issue
>> in
>>>>> Flink.
>>>>>>> For example, we might have two operators sharing the same slot
>> and
>>>>> these
>>>>>>> two operators both use managed memory (e.g. to sort inputs).
>> There
>>> is
>>>>>>> currently no way for one operator to re-use the memory not used
>> by
>>>> the
>>>>>>> other operator.
>>>>>>>
>>>>>>> Therefore, I think we can treat this as an orthogonal performance
>>>>>>> optimization problem which can be addressed separately. What do
>> you
>>>>>> think?
>>>>>>>
>>>>>>>
>>>>>>>>    - Sometimes sorting, even if we have memory to do that,
>> might
>>>> be
>>>>> an
>>>>>>>> unnecessary overhead.
>>>>>>>> - Watermarks
>>>>>>>>    - Is holding back watermarks always good? If we have tons
>> of
>>>> data
>>>>>>>> buffered/sorted and waiting to be processed
>>>>>>>>       with multiple windows per key and many different keys.
>>> When
>>>> we
>>>>>>>> switch back to `isBacklog=false` we
>>>>>>>>       first process all of that data before processing
>>> watermarks,
>>>>> for
>>>>>>>> operators that are not using sorted input the
>>>>>>>>       state size can explode significantly causing lots of
>>>> problems.
>>>>>>> Even
>>>>>>>> for those that can use sorting, switching to
>>>>>>>>       sorting or BatchExecutionKeyedStateBackend is not
>> always a
>>>>> good
>>>>>>>> idea, but keeping RocksDB also can be
>>>>>>>>       risky.
>>>>>>>>
>>>>>>>
>>>>>>> With the current FLIP, the proposal is to use a sorter only when
>>> the
>>>>>> inputs
>>>>>>> have keys. According to this practice, operators which are not
>>> using
>>>>>>> sorting should have un-keyed inputs. I believe such an operator
>>> will
>>>>> not
>>>>>>> even use a keyed state backend. Maybe I missed some use-case. Can
>>> you
>>>>>>> provide a use-case where we will have an operator with un-keyed
>>>> inputs
>>>>>>> whose state size can explode due to we holding back watermarks?
>>>>>>>
>>>>>>> For operators with keyed inputs that use sorting, I suppose it is
>>>>>> possible
>>>>>>> that sorting + BatchExecutionKeyedStateBackend can be worse than
>>>> using
>>>>>>> RocksDB. But I believe this is very very rare (if possible) in
>>> almost
>>>>>>> practical usage of Flink.
>>>>>>>
>>>>>>> Take one step back, if this indeed cause regression for a real
>>>>> use-case,
>>>>>>> user can set execution.checkpointing.interval-during-backlog to
>>>>> anything
>>>>>>> other than 0 so that this FLIP will not use
>>>>>>> sorter + BatchExecutionKeyedStateBackend even even when there is
>>>>> backlog.
>>>>>>>
>>>>>>> I would hope we can find a way to automatically determine whether
>>>> using
>>>>>>> sorting + BatchExecutionKeyedStateBackend can be better or worse
>>> than
>>>>>> using
>>>>>>> RocksDB alone. But I could not find a good and reliable way to do
>>>> this.
>>>>>>> Maybe we can update Flink to do this when we find a good way to
>> do
>>>> this
>>>>>> in
>>>>>>> the future?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> - Keyed state backend
>>>>>>>>    - I think you haven't described what happens during
>> switching
>>>>> from
>>>>>>>> streaming to backlog processing.
>>>>>>>>
>>>>>>>
>>>>>>> Good point. This indeed needs to be described. I added a TODO in
>>> the
>>>>>>> "Behavior changes ..." section to describe what happens when
>>>> isBacklog
>>>>>>> switches from false to true, for all
>>>> watermark/checkpoint/statebackend
>>>>>> etc.
>>>>>>>
>>>>>>> Let me explain this for the state backend here for now. I will
>>> update
>>>>>> FLIP
>>>>>>> later.
>>>>>>>
>>>>>>> When isBacklog switches from false to true, operator with keyed
>>>> inputs
>>>>>> can
>>>>>>> optionally (as determined by its implementation) starts to use
>>>> internal
>>>>>>> sorter to sort inputs by key, without processing inputs or
>> updating
>>>>>>> statebackend, until it receives end-of-inputs or isBacklog is
>>>> switched
>>>>> to
>>>>>>> false again.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>    - Switch can be an unnecessary overhead.
>>>>>>>
>>>>>>>
>>>>>>> I agree it can cause unnecessary overhead, particularly when
>>>> isBacklog
>>>>>>> switches back and forth frequently. Whether or not this is
>>>> unnecessary
>>>>>>> likely depends on the duration/throughput of the backlog phase as
>>>> well
>>>>> as
>>>>>>> the specific computation logic of the operator. I am not sure
>> there
>>>> is
>>>>> a
>>>>>>> good way for Flink to determine in advance whether switching is
>>>>>>> unnecessary.
>>>>>>>
>>>>>>> Note that for the existing use-case where we expect to change
>>>> isBacklog
>>>>>> to
>>>>>>> true (e.g. MySQL CDC snapshot phase, Kafka source watermark lag
>>> being
>>>>> too
>>>>>>> high), we don't expect the watermark to switch back and force
>>>>> frequently.
>>>>>>> And user can disable this switch by setting
>>>>>>> execution.checkpointing.interval-during-backlog to anything other
>>>> than
>>>>> 0.
>>>>>>>
>>>>>>> Therefore, I am wondering if we can also view this as a
>> performance
>>>>>>> optimization opportunity for extra use-cases in the future,
>> rather
>>>>> than a
>>>>>>> blocking issue of this FLIP for the MVP use-case (e.g. snapshot
>>> phase
>>>>> for
>>>>>>> any CDC source, Kafka watermark lag).
>>>>>>>
>>>>>>>
>>>>>>>> At the same time, in your current proposal, for
>>>>>>>> `execution.checkpointing.interval-during-backlog > 0` we won't
>>>>>>>> switch to "batch" mode at all. That's a bit of shame, I don't
>>>>>> understand
>>>>>>>> why those two things should be coupled
>>>>>>>> together?
>>>>>>>>
>>>>>>>
>>>>>>> We can in general classify optimizations as those that are
>>> compatible
>>>>>> with
>>>>>>> checkpointing, and those that are not compatible with
>>> checkpointing.
>>>>> For
>>>>>>> example, input sorting is currently not compatible with
>>>> checkpointing.
>>>>>> And
>>>>>>> buffering input records to reduce state backend overhead (and
>>>> probably
>>>>>>> columnar processing for mini-batch in the future) is compatible
>>> with
>>>>>>> checkpointing.
>>>>>>>
>>>>>>> The primary of FLIP-327 is to support optimizations not
>> compatible
>>>> with
>>>>>>> checkpointing. If
>> execution.checkpointing.interval-during-backlog >
>>>> 0,
>>>>>>> which means that user intends to still do checkpointing even when
>>>> there
>>>>>> is
>>>>>>> backog, then we will not be able to support such optimizations.
>>>>>>>
>>>>>>> For optimizations that are compatible with checkpointing, we can
>> do
>>>>> this
>>>>>>> even when the operator does not run in "batch mode". There are
>>> extra
>>>>>>> problems to solve in order to achieve this optimization, such as
>>>>>> supporting
>>>>>>> unaligned checkpointing without prolonging its sync phase. I plan
>>> to
>>>>>>> explain how this can be done in FLIP-325.
>>>>>>>
>>>>>>>
>>>>>>>> All in all, shouldn't we aim for some more clever process of
>>>>> switching
>>>>>>> back
>>>>>>>> and forth between streaming/batch modes
>>>>>>>> for watermark strategy/state backend/sorting based on some
>>> metrics?
>>>>>>> Trying
>>>>>>>> to either predict if switching might help,
>>>>>>>> or trying to estimate if the last switch was beneficial? Maybe
>>>>>> something
>>>>>>>> along the lines:
>>>>>>>> - sort only in memory and during sorting count the number of
>>>> distinct
>>>>>>> keys
>>>>>>>> (NDK)
>>>>>>>>    - maybe allow for spilling if so far in memory we have NDK
>> *
>>> 5
>>>>> =
>>>>>>>> #records
>>>>>>>> - do not allow to buffer records above a certain threshold, as
>>>>>> otherwise
>>>>>>>> checkpointing can explode
>>>>>>>> - switch to `BatchExecutionKeyedStateBackend` only if NDK * 2
>>> =
>>>>>> #records
>>>>>>>> - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records
>>>>>>>>
>>>>>>>> Or even maybe for starters something even simpler and then test
>>> out
>>>>>>>> something more fancy as a follow up?
>>>>>>>>
>>>>>>>
>>>>>>> I agree it is worth investigating these ideas to further optimize
>>> the
>>>>>>> performance during backlog.
>>>>>>>
>>>>>>> I just think these can be done independently after this FLIP. The
>>>> focus
>>>>>> of
>>>>>>> this FLIP is to re-use in stream mode the same optimization which
>>> we
>>>>>>> already use in batch mode, rather than inventing or improving the
>>>>>>> performance of these existing optimizations.
>>>>>>>
>>>>>>> Given that there are already a lot of new mechanism/features to
>>>> discuss
>>>>>> and
>>>>>>> address in this FLIP, I am hoping we can limit the scope of this
>>> FLIP
>>>>> to
>>>>>>> re-use the existing optimization, and do these extra optimization
>>>>>>> opportunities as future work.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> At the same time,
>>>> `execution.checkpointing.interval-during-backlog=0`
>>>>>>> seems
>>>>>>>> a weird setting to me, that I would
>>>>>>>> not feel safe recommending to anyone. If processing of a
>> backlog
>>>>> takes
>>>>>> a
>>>>>>>> long time, a job might stop making
>>>>>>>> any progress due to some random failures. Especially dangerous
>>> if a
>>>>> job
>>>>>>>
>>>>>>> switches from streaming mode back to
>>>>>>>> backlog processing due to some reasons, as that could happen
>>> months
>>>>>> after
>>>>>>>> someone started a job with this
>>>>>>>> strange setting. So should we even have it? I would simply
>>> disallow
>>>>>> it. I
>>>>>>>>
>>>>>>>
>>>>>>> Good point. I do agree we need to further work to improve the
>>>> failover
>>>>>>> performance in case any task fails.
>>>>>>>
>>>>>>> As of the current FLIP, if any task fails during backlog and
>>>>>>> execution.checkpointing.interval-during-backlog = 0, we will need
>>> to
>>>>>>> restart all operators to the last checkpointed state and continue
>>>>>>> processing backlog. And this can be a lot of rollback since there
>>> is
>>>> no
>>>>>>> checkpoint during backlog. And this can also be worse than batch
>>>> since
>>>>>> this
>>>>>>> FLIP currently does not support exporting/saving records to local
>>>> disk
>>>>>> (or
>>>>>>> shuffle service) so that a failed task can re-consume the records
>>>> from
>>>>>> the
>>>>>>> upstream task (or shuffle service) in the same way as how Flink
>>>>> failover
>>>>>> a
>>>>>>> task in batch mode.
>>>>>>>
>>>>>>> I think we can extend this FLIP to solve this problem so that it
>>> can
>>>>> have
>>>>>>> at least the same behavior/performance as batch-mode job. The
>> idea
>>> is
>>>>> to
>>>>>>> also follow what batch mode does. For example, we can trigger a
>>>>>> checkpoint
>>>>>>> when isBacklog switches to true, and every operator should buffer
>>> its
>>>>>>> output in the TM local disk (or remote shuffle service).
>> Therefore,
>>>>>> after a
>>>>>>> task fails, it can restart from the last checkpoint and
>> re-consume
>>>> data
>>>>>>> buffered in the upstream task.
>>>>>>>
>>>>>>> I will update FLIP as described above. Would this address your
>>>> concern?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> could see a power setting like:
>>>>>>>>        `execution.backlog.use-full-batch-mode-on-start
>> (default
>>>>>> false)`
>>>>>>>>
>>>>>>>
>>>>>>> I am not sure I fully understand this config or its motivation.
>> Can
>>>> you
>>>>>>> help explain the exact semantics of this config?
>>>>>>>
>>>>>>>
>>>>>>>> that would override any heuristic of switching to backlog if
>>>> someone
>>>>> is
>>>>>>>> submitting a new job that starts with
>>>>>>>> `isBacklog=true`.
>>>>>>>>
>>>>>>>> Or we could limit the scope of this FLIP to only support
>> starting
>>>>> with
>>>>>>>> batch mode and switching only once to
>>>>>>>> streaming, and design a follow up with switching back and
>> forth?
>>>>>>>>
>>>>>>>
>>>>>>> Sure, that sounds good to me. I am happy to split this FLIP into
>>> two
>>>>>> FLIPs
>>>>>>> so that we can make incremental progress.
>>>>>>>
>>>>>>> Best,
>>>>>>> Dong
>>>>>>>
>>>>>>>
>>>>>>>> I'm looking forwards to hearing/reading out your thoughts.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Piotrek
>>>>>>>>
>>>>>>>>
>>>>>>>> śr., 12 lip 2023 o 12:38 Jing Ge <j...@ververica.com.invalid>
>>>>>>> napisał(a):
>>>>>>>>
>>>>>>>>> Hi Dong,
>>>>>>>>>
>>>>>>>>> Thanks for your reply!
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>> Jing
>>>>>>>>>
>>>>>>>>> On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <
>> lindon...@gmail.com>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jing,
>>>>>>>>>>
>>>>>>>>>> Thanks for the comments. Please see my reply inline.
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 12, 2023 at 5:04 AM Jing Ge
>>>>> <j...@ververica.com.invalid
>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the clarification. Now it is clear for me. I
>> got
>>>>>>>> additional
>>>>>>>>>> noob
>>>>>>>>>>> questions wrt the internal sorter.
>>>>>>>>>>>
>>>>>>>>>>> 1. when to call setter to set the internalSorterSupported
>>> to
>>>> be
>>>>>>> true?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Developer of the operator class (i.e. those classes which
>>>>>> implements
>>>>>>>>>> `StreamOperator`) should override the
>>>> `#getOperatorAttributes()`
>>>>>> API
>>>>>>> to
>>>>>>>>> set
>>>>>>>>>> internalSorterSupported to true, if he/she decides to sort
>>>>> records
>>>>>>>>>> internally in the operator.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> 2
>>>>>>>>>>> *"For those operators whose throughput can be
>> considerably
>>>>>> improved
>>>>>>>>> with
>>>>>>>>>> an
>>>>>>>>>>> internal sorter, update it to take advantage of the
>>> internal
>>>>>> sorter
>>>>>>>>> when
>>>>>>>>>>> its input has isBacklog=true.*
>>>>>>>>>>> *Typically, operators that involve aggregation operation
>>>> (e.g.
>>>>>>> join,
>>>>>>>>>>> cogroup, aggregate) on keyed inputs can benefit from
>> using
>>> an
>>>>>>>> internal
>>>>>>>>>>> sorter."*
>>>>>>>>>>>
>>>>>>>>>>> *"The operator that performs CoGroup operation will
>>>> instantiate
>>>>>> two
>>>>>>>>>>> internal sorter to sorts records from its two inputs
>>>>> separately.
>>>>>>> Then
>>>>>>>>> it
>>>>>>>>>>> can pull the sorted records from these two sorters. This
>>> can
>>>> be
>>>>>>> done
>>>>>>>>>>> without wrapping input records with TaggedUnion<...>. In
>>>>>>> comparison,
>>>>>>>>> the
>>>>>>>>>>> existing DataStream#coGroup needs to wrap input records
>>> with
>>>>>>>>>>> TaggedUnion<...> before sorting them using one external
>>>> sorter,
>>>>>>> which
>>>>>>>>>>> introduces higher overhead."*
>>>>>>>>>>>
>>>>>>>>>>> According to the performance test, it seems that internal
>>>>> sorter
>>>>>>> has
>>>>>>>>>> better
>>>>>>>>>>> performance than external sorter. Is it possible to make
>>>> those
>>>>>>>>> operators
>>>>>>>>>>> that can benefit from it use internal sorter by default?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Yes, it is possible. After this FLIP is done, users can use
>>>>>>>>>> DataStream#coGroup with EndOfStreamWindows as the window
>>>> assigner
>>>>>> to
>>>>>>>>>> co-group two streams in effectively the batch manner. An
>>>> operator
>>>>>>> that
>>>>>>>>> uses
>>>>>>>>>> an internal sorter will be used to perform the co-group
>>>>> operation.
>>>>>>>> There
>>>>>>>>> is
>>>>>>>>>> no need for users of the DataStream API to explicitly know
>> or
>>>> set
>>>>>> the
>>>>>>>>>> internal sorter in anyway.
>>>>>>>>>>
>>>>>>>>>> In the future, we plan to incrementally optimize other
>>>>> aggregation
>>>>>>>>>> operation (e.g. aggregate) on the DataStream API when
>>>>>>>> EndOfStreamWindows
>>>>>>>>> is
>>>>>>>>>> used as the window assigner.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Dong
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Jing
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <
>>>> lindon...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Jing,
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you for the comments! Please see my reply inline.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jul 11, 2023 at 5:41 AM Jing Ge
>>>>>>> <j...@ververica.com.invalid
>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the proposal! The FLIP is already in good
>>>>> shape. I
>>>>>>> got
>>>>>>>>>> some
>>>>>>>>>>>> NIT
>>>>>>>>>>>>> questions.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. It is a little bit weird to write the hint right
>>> after
>>>>> the
>>>>>>>>>>> motivation
>>>>>>>>>>>>> that some features have been moved to FLIP-331,
>> because
>>>> at
>>>>>> that
>>>>>>>>> time,
>>>>>>>>>>>>> readers don't know the context about what features
>> does
>>>> it
>>>>>>> mean.
>>>>>>>> I
>>>>>>>>>>> would
>>>>>>>>>>>>> suggest moving the note to the beginning of "Public
>>>>>> interfaces"
>>>>>>>>>>> sections.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Given that the reviewer who commented on this email
>>> thread
>>>>>>> before I
>>>>>>>>>>>> refactored the FLIP (i.e. Piotr) has read FLP-331, I
>>> think
>>>> it
>>>>>> is
>>>>>>>>>> simpler
>>>>>>>>>>> to
>>>>>>>>>>>> just remove any mention of FLIP-331. I have updated the
>>>> FLIP
>>>>>>>>>> accordingly.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> 2. It is also a little bit weird to describe all
>>>> behaviour
>>>>>>>> changes
>>>>>>>>> at
>>>>>>>>>>>> first
>>>>>>>>>>>>> but only focus on one single feature, i.e. how to
>>>> implement
>>>>>>>>>>>>> internalSorterSupported. TBH, I was lost while I was
>>>>> reading
>>>>>>> the
>>>>>>>>>> Public
>>>>>>>>>>>>> interfaces. Maybe change the FLIP title? Another
>> option
>>>>> could
>>>>>>> be
>>>>>>>> to
>>>>>>>>>>>> write a
>>>>>>>>>>>>> short summary of all features and point out that this
>>>> FLIP
>>>>>> will
>>>>>>>>> only
>>>>>>>>>>>> focus
>>>>>>>>>>>>> on the internalSorterSupported feature. Others could
>> be
>>>>> found
>>>>>>> in
>>>>>>>>>>>> FLIP-331.
>>>>>>>>>>>>> WDYT?
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Conceptually, the purpose of this FLIP is to allow a
>>> stream
>>>>>> mode
>>>>>>>> job
>>>>>>>>> to
>>>>>>>>>>> run
>>>>>>>>>>>> parts of the topology in batch mode so that it can
>> apply
>>>>>>>>>>>> optimizations/computations that can not be used
>> together
>>>> with
>>>>>>>>>>> checkpointing
>>>>>>>>>>>> (and thus not usable in stream mode). Although internal
>>>>> sorter
>>>>>> is
>>>>>>>> the
>>>>>>>>>>> only
>>>>>>>>>>>> optimization immediately supported in this FLIP, this
>>> FLIP
>>>>> lays
>>>>>>> the
>>>>>>>>>>>> foundation to support other optimizations in the
>> future,
>>>> such
>>>>>> as
>>>>>>>>> using
>>>>>>>>>>> GPU
>>>>>>>>>>>> to process a bounded stream of records.
>>>>>>>>>>>>
>>>>>>>>>>>> Therefore, I find it better to keep the current title
>>>> rather
>>>>>> than
>>>>>>>>>>> limiting
>>>>>>>>>>>> the scope to internal sorter. What do you think?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> 3. There should be a typo at 4) Checkpoint and
>> failover
>>>>>>> strategy
>>>>>>>> ->
>>>>>>>>>>> Mixed
>>>>>>>>>>>>> mode ->
>>>>>>>>>>>>>
>>>>>>>>>>>>>   - If any task fails when isBacklog=false true,
>> this
>>>> task
>>>>>> is
>>>>>>>>>>> restarted
>>>>>>>>>>>> to
>>>>>>>>>>>>>   re-process its input from the beginning.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>> Thank you for catching this issue. It is fixed now.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Dong
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best regards
>>>>>>>>>>>>> Jing
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <
>>>>> lindon...@gmail.com
>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Piotr,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your comments! Please see my reply
>> inline.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <
>>>>>>>>>>>> piotr.nowoj...@gmail.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a couple of questions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Could you explain why those properties
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    @Nullable private Boolean isOutputOnEOF =
>> null;
>>>>>>>>>>>>>>>    @Nullable private Boolean
>> isOutputOnCheckpoint
>>> =
>>>>>> null;
>>>>>>>>>>>>>>>    @Nullable private Boolean
>>>>> isInternalSorterSupported =
>>>>>>>> null;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> must be `@Nullable`, instead of having the
>> default
>>>>> value
>>>>>>> set
>>>>>>>> to
>>>>>>>>>>>>> `false`?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> By initializing these private variables in
>>>>>>>>>> OperatorAttributesBuilder
>>>>>>>>>>> as
>>>>>>>>>>>>>> null, we can implement
>>>>> `OperatorAttributesBuilder#build()`
>>>>>> in
>>>>>>>>> such
>>>>>>>>>> a
>>>>>>>>>>>> way
>>>>>>>>>>>>>> that it can print DEBUG level logging to say
>>>>>>>>> "isOutputOnCheckpoint
>>>>>>>>>> is
>>>>>>>>>>>> not
>>>>>>>>>>>>>> explicitly set". This can help user/SRE debug
>>>> performance
>>>>>>>> issues
>>>>>>>>>> (or
>>>>>>>>>>>> lack
>>>>>>>>>>>>>> of the expected optimization) due to operators not
>>>>>> explicitly
>>>>>>>>>> setting
>>>>>>>>>>>> the
>>>>>>>>>>>>>> right operator attribute.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For example, we might want a job to always use the
>>>> longer
>>>>>>>>>>> checkpointing
>>>>>>>>>>>>>> interval (i.e.
>>>>>>> execution.checkpointing.interval-during-backlog)
>>>>>>>>> if
>>>>>>>>>>> all
>>>>>>>>>>>>>> running operators have isOutputOnCheckpoint==false,
>>> and
>>>>> use
>>>>>>> the
>>>>>>>>>> short
>>>>>>>>>>>>>> checkpointing interval otherwise. If a user has
>>>>> explicitly
>>>>>>>>>> configured
>>>>>>>>>>>> the
>>>>>>>>>>>>>> execution.checkpointing.interval-during-backlog but
>>> the
>>>>>>>> two-phase
>>>>>>>>>>>> commit
>>>>>>>>>>>>>> sink library has not been upgraded to set
>>>>>>>>>> isOutputOnCheckpoint=true,
>>>>>>>>>>>> then
>>>>>>>>>>>>>> the job will end up using the long checkpointing
>>>>> interval,
>>>>>>> and
>>>>>>>> it
>>>>>>>>>>> will
>>>>>>>>>>>> be
>>>>>>>>>>>>>> useful to figure out what is going wrong in this
>> case
>>>> by
>>>>>>>> checking
>>>>>>>>>> the
>>>>>>>>>>>>> log.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Note that the default value of these fields of the
>>>>>>>>>> OperatorAttributes
>>>>>>>>>>>>>> instance built by OperatorAttributesBuilder will
>>> still
>>>> be
>>>>>>>> false.
>>>>>>>>>> The
>>>>>>>>>>>>>> following is mentioned in the Java doc of
>>>>>>>>>>>>>> `OperatorAttributesBuilder#build()`:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>  * If any operator attribute is null, we will log
>> it
>>>> at
>>>>>>> DEBUG
>>>>>>>>>> level
>>>>>>>>>>>> and
>>>>>>>>>>>>>> use the following
>>>>>>>>>>>>>>  * default values.
>>>>>>>>>>>>>>  * - isOutputOnEOF defaults to false
>>>>>>>>>>>>>>  * - isOutputOnCheckpoint defaults to false
>>>>>>>>>>>>>>  * - isInternalSorterSupported defaults to false
>>>>>>>>>>>>>>  */
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Second question, have you thought about cases
>> where
>>>>>> someone
>>>>>>>> is
>>>>>>>>>>>>>>> either bootstrapping from a streaming source like
>>>> Kafka
>>>>>>>>>>>>>>> or simply trying to catch up after a long period
>> of
>>>>>>> downtime
>>>>>>>>> in a
>>>>>>>>>>>>> purely
>>>>>>>>>>>>>>> streaming job? Generally speaking a cases where
>>>>>>>>>>>>>>> user doesn't care about latency in the catch up
>>>> phase,
>>>>>>>>> regardless
>>>>>>>>>>> if
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> source is bounded or unbounded, but wants to
>>> process
>>>>>>>>>>>>>>> the data as fast as possible, and then switch
>>>>> dynamically
>>>>>>> to
>>>>>>>>> real
>>>>>>>>>>>> time
>>>>>>>>>>>>>>> processing?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, I have thought about this. We should allow
>> this
>>>> job
>>>>> to
>>>>>>>>>>> effectively
>>>>>>>>>>>>> run
>>>>>>>>>>>>>> in batch mode when the job is in the catch-up
>> phase.
>>>>>> FLIP-327
>>>>>>>> is
>>>>>>>>>>>> actually
>>>>>>>>>>>>>> an important step toward addressing this use-case.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In order to address the above use-case, all we need
>>> is
>>>> a
>>>>>> way
>>>>>>>> for
>>>>>>>>>>> source
>>>>>>>>>>>>>> operator (e.g. Kafka) to tell Flink runtime (via
>>>>>>>>>> IsProcessingBacklog)
>>>>>>>>>>>>>> whether it is in the catch-up phase.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Since every Kafka message has event-timestamp, we
>> can
>>>>> allow
>>>>>>>> users
>>>>>>>>>> to
>>>>>>>>>>>>>> specify a job-level config such as
>>>>>>>>> backlog-watermark-lag-threshold,
>>>>>>>>>>> and
>>>>>>>>>>>>>> consider a Kafka Source to have
>>>> IsProcessingBacklog=true
>>>>> if
>>>>>>>>>>>> system_time -
>>>>>>>>>>>>>> watermark > backlog-watermark-lag-threshold. This
>>>>>> effectively
>>>>>>>>>> allows
>>>>>>>>>>> us
>>>>>>>>>>>>> to
>>>>>>>>>>>>>> determine whether Kafka is in the catch up phase.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Once we have this capability (I plan to work on
>> this
>>> in
>>>>>>>>> FLIP-328),
>>>>>>>>>> we
>>>>>>>>>>>> can
>>>>>>>>>>>>>> directly use the features proposed in FLIP-325 and
>>>>> FLIP-327
>>>>>>> to
>>>>>>>>>>> optimize
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> above use-case.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> niedz., 2 lip 2023 o 16:15 Dong Lin <
>>>>> lindon...@gmail.com
>>>>>>>
>>>>>>>>>>>> napisał(a):
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am opening this thread to discuss FLIP-327:
>>>> Support
>>>>>>>>>>> stream-batch
>>>>>>>>>>>>>>> unified
>>>>>>>>>>>>>>>> operator to improve job throughput when
>>> processing
>>>>>>> backlog
>>>>>>>>>> data.
>>>>>>>>>>>> The
>>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>> doc can be found at
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
>>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This FLIP enables a Flink job to initially
>>> operate
>>>> in
>>>>>>> batch
>>>>>>>>>> mode,
>>>>>>>>>>>>>>> achieving
>>>>>>>>>>>>>>>> high throughput while processing records that
>> do
>>>> not
>>>>>>>> require
>>>>>>>>>> low
>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>> latency. Subsequently, the job can seamlessly
>>>>>> transition
>>>>>>> to
>>>>>>>>>>> stream
>>>>>>>>>>>>> mode
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> processing real-time records with low latency.
>>>>>>> Importantly,
>>>>>>>>> the
>>>>>>>>>>>> same
>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>> can be utilized before and after this mode
>>> switch,
>>>>>> making
>>>>>>>> it
>>>>>>>>>>>>>> particularly
>>>>>>>>>>>>>>>> valuable when users wish to bootstrap the job's
>>>> state
>>>>>>> using
>>>>>>>>>>>>> historical
>>>>>>>>>>>>>>>> data.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We would greatly appreciate any comments or
>>>> feedback
>>>>>> you
>>>>>>>> may
>>>>>>>>>> have
>>>>>>>>>>>> on
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to