Small correction. I meant we need to adjust the EndOfInputEvent of course.

Best,

Dawid

On 19/07/2021 11:48, Dawid Wysakowicz wrote:
> Hey Till,
>
> Yes, you're right we will have to adjust the current state of
> EndOfPartitionEvent and move the moment when we emit it to have what
> we're discussing here. We are aware of that.
>
> As for the MAX_WATERMARK vs finish(). My take is that we should always
> emit MAX_WATERMARK before calling finish() on an operator. At the same
> time finish() should not leave behind anything in state, as the
> intention is that we never restore from the taken savepoint/checkpoint
> (savepoint w drain or bounded data consumed).
>
> Best,
>
> Dawid
>
> On 19/07/2021 11:33, Till Rohrmann wrote:
>> Hi Yun and Dawid,
>>
>> Thanks for your comments. I do agree with your comments that finish() can
>> do more than MAX_WATERMARK. I guess we should then explain how
>> MAX_WATERMARK and finish() play together and what kind of
>> order guarantees we provide.
>>
>> Concerning the EndOfPartitionEvent, I am not entirely sure whether it would
>> work in its current state because we send this event when the Task is about
>> to shut down if I am not mistaken. What we want to have is to bring the
>> StreamTasks into a state so that they shut down on the next checkpoint. For
>> this we need to keep the StreamTask running. In general, I am a fan of
>> making things explicit if possible. I think this helps maintenance and
>> evolvability of code. That's why I think sending an EndOfInputEvent which
>> is a StreamTask level event and which says that there won't be any other
>> records coming only control events could make sense.
>>
>> I would leave the proposed optimization out of the first version. We can
>> still add it at a later point in time.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz <dwysakow...@apache.org>
>> wrote:
>>
>>> Personally I don't find this optimization important and I'd rather leave
>>> it out not to complicate the codebase further. I doubt we save much there.
>>> I don't have a strong opinion though.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 19/07/2021 10:31, Yun Gao wrote:
>>>
>>> Hi,
>>>
>>> Very thanks Dawid for the thoughts!
>>>
>>> Currently I also do not have different opinions regarding this part.
>>> But I have one more issue to confirm: during the previous discussion we
>>> have discussed that for the final checkpoint case, we might have an
>>> optmization
>>> that if a task do not have operators using 2-pc, we might skip waiting for
>>> the
>>> final checkpoint (but we could not skip the savepoint). To allow users to
>>> express
>>> the logic, we have proposed to add one more method to StreamOperator &
>>> CheckpointListener:
>>>
>>> interface StreamOperator {
>>>     default boolean requiresFinalCheckpoint() {
>>>         return true;
>>>     }
>>> }
>>>
>>> interface CheckpointListener {
>>>
>>>     default boolean requiresFinalCheckpoint() {
>>>         return true;
>>>     }
>>> }
>>>
>>> class AbstractUdfStreamOperator {
>>>
>>>     @Override
>>>     boolean requiresFinalCheckpoint() {
>>>         return userFunction instanceof CheckpointListener &&
>>>             ((CheckpointListener) userFunction).requiresFinalCheckpoint();
>>>     }
>>> }
>>>
>>>
>>> I think we should still keep the change ?
>>>
>>> Best,
>>> Yun
>>>
>>> ------------------Original Mail ------------------
>>> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org>
>>> <dwysakow...@apache.org>
>>> *Send Date:*Sun Jul 18 18:44:50 2021
>>> *Recipients:*Flink Dev <dev@flink.apache.org> <dev@flink.apache.org>, Yun
>>> Gao <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID>
>>> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
>>> Finished
>>>
>>>> I think we're all really close to the same solution.
>>>>
>>>>
>>>>
>>>> I second Yun's thoughts that MAX_WATERMARK works well for time based
>>>>
>>>> buffering, but it does not solve flushing other operations such as e.g.
>>>>
>>>> count windows or batching requests in Sinks. I'd prefer to treat the
>>>>
>>>> finish() as a message for Operator to "flush all records". The
>>>>
>>>> MAX_WATERMARK in my opinion is mostly for backwards compatibility imo. I
>>>>
>>>> don't think operators need to get a signal "stop-processing" if they
>>>>
>>>> don't need to flush records. The "WHEN" records are emitted, should be
>>>>
>>>> in control of the StreamTask, by firing timers or by processing a next
>>>>
>>>> record from upstream.
>>>>
>>>>
>>>>
>>>> The only difference of my previous proposal compared to Yun's is that I
>>>>
>>>> did not want to send the EndOfUserRecords event in case of stop w/o
>>>>
>>>> drain. My thinking was that we could directly go from RUNNING to
>>>>
>>>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit
>>>>
>>>> EndOfUserRecordsEvent with an additional flag and e.g. stop firing
>>>>
>>>> timers and processing events (without calling finish() on Operator). In
>>>>
>>>> my initial suggestion I though we don't care about some events
>>>>
>>>> potentially being emitted after the savepoint was taken, as they would
>>>>
>>>> anyway belong to the next after FINAL, which would be discarded. I think
>>>>
>>>> though the proposal to suspend records processing and timers is a
>>>>
>>>> sensible thing to do and would go with the version that Yun put into the
>>>>
>>>> FLIP Wiki.
>>>>
>>>>
>>>>
>>>> What do you think Till?
>>>>
>>>>
>>>>
>>>> Best,
>>>>
>>>>
>>>>
>>>> Dawid
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 16/07/2021 10:03, Yun Gao wrote:
>>>>
>>>>> Hi Till, Piotr
>>>>> Very thanks for the comments!
>>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>>>>> I also agree with Piotr that currently they are independent mechanisms,
>>>> and they are basically the same
>>>>
>>>>> for the event time.
>>>>> For more details, first there are some difference among the three
>>>> scenarios regarding the finish:
>>>>
>>>>> For normal finish and stop-with-savepoint --drain, the job would not be
>>>> expected to be restarted,
>>>>
>>>>> and for stop-with-savepoint the job would be expected restart later.
>>>>> Then for finish / stop-with-savepoint --drain, currently Flink would
>>>> emit MAX_WATERMARK before the
>>>>
>>>>> EndOfPartition. Besides, as we have discussed before [1], endOfInput /
>>>> finish() should also only be called
>>>>
>>>>> for finish / stop-with-savepoint --drain. Thus currently they always
>>>> occurs at the same time. After the change,
>>>>
>>>>> we could emit MAX_WATERMARK before endOfInput event for the finish /
>>>> stop-with-savepoint --drain cases.
>>>>
>>>>>> 2) StreamOperator.finish says to flush all buffered events. Would a
>>>>>> WindowOperator close all windows and emit the results upon calling
>>>>>> finish, for example?
>>>>> As discussed above for stop-with-savepoint, we would always keep the
>>>> window as is, and restore them after restart.
>>>>
>>>>> Then for the finish / stop-with-savepoint --drain, I think perhaps it
>>>> depends on the Triggers. For
>>>>
>>>>> event-time triggers / process time triggers, it would be reasonable to
>>>> flush all the windows since logically
>>>>
>>>>> the time would always elapse and the window would always get triggered
>>>> in a logical future. But for triggers
>>>>
>>>>> like CountTrigger, no matter how much time pass logically, the windows
>>>> would not trigger, thus we may not
>>>>
>>>>> flush these windows. If there are requirements we may provide
>>>> additional triggers.
>>>>
>>>>>> It's a bit messy and I'm not sure if this should be strengthened out?
>>>> Each one of those has a little bit different semantic/meaning,
>>>>
>>>>>> but at the same time they are very similar. For single input operators
>>>> `endInput()` and `finish()` are actually the very same thing.
>>>>
>>>>> Currently MAX_WATERMARK / endInput / finish indeed always happen at the
>>>> same time, and for single input operators `endInput()` and `finish()`
>>>>
>>>>> are indeed the same thing. During the last discussion we ever mentioned
>>>> this issue and at then we thought that we might deprecate `endInput()`
>>>>
>>>>> in the future, then we would only have endInput(int input) and
>>>> finish().
>>>>
>>>>> Best,
>>>>> Yun
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-21132
>>>>> ------------------------------------------------------------------
>>>>> From:Piotr Nowojski
>>>>> Send Time:2021 Jul. 16 (Fri.) 13:48
>>>>> To:dev
>>>>> Cc:Yun Gao
>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
>>>> Finished
>>>>
>>>>> Hi Till,
>>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>>>>>> 2) StreamOperator.finish says to flush all buffered events. Would a>
>>>> WindowOperator close all windows and emit the results upon calling
>>>>
>>>>>> finish, for example?
>>>>> 1) currently they are independent but parallel mechanisms. With event
>>>> time, they are basically the same.
>>>>
>>>>> 2) it probably should for the sake of processing time windows.
>>>>> Here you are touching the bit of the current design that I like the
>>>> least. We basically have now three different ways of conveying very similar
>>>> things:
>>>>
>>>>> a) sending `MAX_WATERMARK`, used by event time WindowOperator (what
>>>> about processing time?)
>>>>
>>>>> b) endInput(), used for example by AsyncWaitOperator to flush it's
>>>> internal state
>>>>
>>>>> c) finish(), used for example by ContinuousFileReaderOperator
>>>>> It's a bit messy and I'm not sure if this should be strengthened out?
>>>> Each one of those has a little bit different semantic/meaning, but at the
>>>> same time they are very similar. For single input operators `endInput()`
>>>> and `finish()` are actually the very same thing.
>>>>
>>>>> Piotrek
>>>>> czw., 15 lip 2021 o 16:47 Till Rohrmann napisaƂ(a):
>>>>> Thanks for updating the FLIP. Based on the new section about
>>>>> stop-with-savepoint [--drain] I got two other questions:
>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>>>>> 2) StreamOperator.finish says to flush all buffered events. Would a
>>>>> WindowOperator close all windows and emit the results upon calling
>>>>> finish, for example?
>>>>> Cheers,
>>>>> Till
>>>>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote:
>>>>>> Thanks a lot for your answers and clarifications Yun.
>>>>>> 1+2) Agreed, this can be a future improvement if this becomes a
>>>> problem.
>>>>
>>>>>> 3) Great, this will help a lot with understanding the FLIP.
>>>>>> Cheers,
>>>>>> Till
>>>>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao
>>>>>> wrote:
>>>>>>> Hi Till,
>>>>>>> Very thanks for the review and comments!
>>>>>>> 1) First I think in fact we could be able to do the computation
>>>> outside
>>>>
>>>>>>> of the main thread,
>>>>>>> and the current implementation mainly due to the computation is in
>>>>>>> general fast and we
>>>>>>> initially want to have a simplified first version.
>>>>>>> The main requirement here is to have a constant view of the state of
>>>> the
>>>>
>>>>>>> tasks, otherwise
>>>>>>> for example if we have A -> B, if A is running when we check if we
>>>> need
>>>>
>>>>>>> to trigger A, we will
>>>>>>> mark A as have to trigger, but if A gets to finished when we check
>>>> B, we
>>>>
>>>>>>> will also mark B as
>>>>>>> have to trigger, then B will receive both rpc trigger and checkpoint
>>>>>>> barrier, which would break
>>>>>>> some assumption on the task side and complicate the implementation.
>>>>>>> But to cope this issue, we in fact could first have a snapshot of the
>>>>>>> tasks' state and then do the
>>>>>>> computation, both the two step do not need to be in the main thread.
>>>>>>> 2) For the computation logic, in fact currently we benefit a lot from
>>>>>>> some shortcuts on all-to-all
>>>>>>> edges and job vertex with all tasks running, these shortcuts could do
>>>>>>> checks on the job vertex level
>>>>>>> first and skip some job vertices as a whole. With this optimization
>>>> we
>>>>
>>>>>>> have a O(V) algorithm, and the
>>>>>>> current running time of the worst case for a job with 320,000 tasks
>>>> is
>>>>
>>>>>>> less than 100ms. For
>>>>>>> daily graph sizes the time would be further reduced linearly.
>>>>>>> If we do the computation based on the last triggered tasks, we may
>>>> not
>>>>
>>>>>>> easily encode this information
>>>>>>> into the shortcuts on the job vertex level. And since the time seems
>>>> to
>>>>
>>>>>>> be short, perhaps it is enough
>>>>>>> to do re-computation from the scratch in consideration of the
>>>> tradeoff
>>>>
>>>>>>> between the performance and
>>>>>>> the complexity ?
>>>>>>> 3) We are going to emit the EndOfInput event exactly after the
>>>> finish()
>>>>
>>>>>>> method and before the last
>>>>>>> snapshotState() method so that we could shut down the whole topology
>>>> with
>>>>
>>>>>>> a single final checkpoint.
>>>>>>> Very sorry for not include enough details for this part and I'll
>>>>>>> complement the FLIP with the details on
>>>>>>> the process of the final checkpoint / savepoint.
>>>>>>> Best,
>>>>>>> Yun
>>>>>>> ------------------------------------------------------------------
>>>>>>> From:Till Rohrmann
>>>>>>> Send Time:2021 Jul. 14 (Wed.) 22:05
>>>>>>> To:dev
>>>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
>>>>>>> Finished
>>>>>>> Hi everyone,
>>>>>>> I am a bit late to the voting party but let me ask three questions:
>>>>>>> 1) Why do we execute the trigger plan computation in the main thread
>>>> if we
>>>>
>>>>>>> cannot guarantee that all tasks are still running when triggering the
>>>>>>> checkpoint? Couldn't we do the computation in a different thread in
>>>> order
>>>>
>>>>>>> to relieve the main thread a bit.
>>>>>>> 2) The implementation of the DefaultCheckpointPlanCalculator seems
>>>> to go
>>>>
>>>>>>> over the whole topology for every calculation. Wouldn't it be more
>>>>>>> efficient to maintain the set of current tasks to trigger and check
>>>>>>> whether
>>>>>>> anything has changed and if so check the succeeding tasks until we
>>>> have
>>>>
>>>>>>> found the current checkpoint trigger frontier?
>>>>>>> 3) When are we going to send the endOfInput events to a downstream
>>>> task?
>>>>
>>>>>>> If
>>>>>>> this happens after we call finish on the upstream operator but before
>>>>>>> snapshotState then it would be possible to shut down the whole
>>>> topology
>>>>
>>>>>>> with a single final checkpoint. I think this part could benefit from
>>>> a bit
>>>>
>>>>>>> more detailed description in the FLIP.
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao
>>>>>>> wrote:
>>>>>>>> Hi there,
>>>>>>>> Since the voting time of FLIP-147[1] has passed, I'm closing the
>>>> vote
>>>>
>>>>>>> now.
>>>>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes:
>>>>>>>> - Dawid Wysakowicz (binding)
>>>>>>>> - Piotr Nowojski(binding)
>>>>>>>> - Jiangang Liu (binding)
>>>>>>>> - Arvid Heise (binding)
>>>>>>>> - Jing Zhang (binding)
>>>>>>>> - Leonard Xu (non-binding)
>>>>>>>> - Guowei Ma (binding)
>>>>>>>> Thus I'm happy to announce that the update to the FLIP-147 is
>>>> accepted.
>>>>
>>>>>>>> Very thanks everyone!
>>>>>>>> Best,
>>>>>>>> Yun
>>>>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
>>>>
>>>>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to