Hi all,
Very thanks @Dawid for resuming the discussion and very thanks @Till for the
summary ! (and very sorry for I missed the mail and do not response in time...)
I also agree with that we could consider the global commits latter separately
after we have addressed the final checkpoints, and also other points as Till
summarized.
Currently the only case that have used the cascade commit is the Table
FileSystem and Hive connectors. I checked the code and found currently they
will commit the
last piece of data directly in endOfInput(). Although this might emit repeat
records if there are failover during job finishing, it avoids emitting the
records in the
notifyCheckpointComplete() after endOfInput(), thus the modification to the
operator lifecycle in final checkpoints would cause compatibility problem for
these connectors,
thus we do not need to modify them at the first place.
2. Regarding the operator lifecycle, I also agree with the proposed changes. To
sum up, I think the operator lifecycle would become
endOfInput(1)
...
endOfInput(n)
flush() --> call UDF's flush method
if some operator requires final checkpoints
snapshotState()
notifyCheckpointComplete()
end if
close() --> call UDF's close method
Since currently the close() is only called in normal finish and dispose() will
be called in both failover and normal case, for compatibility, I think we may
have to postpone the change to a single close() method to version 2.0 ?
3. Regarding the name and position of flush() method, I also agree with that we
will need a separate method to mark the termination of the whole stream for
multiple-input streams. Would it be also ok if we have some modification to the
current BoundedXXInput interfaces to
interface BoundedInput {
void endInput() // marks the end of the whole streams, as flush() does.
}
@deprecated // In the future we could remove this interface
interface BoundedOneInput extends BoundedInput {}
interface BoundedMultiInput extends BoundedInput {
void endInput(int i);
default void endInput() {} // For compatibility
}
If operator/UDF does not care about the end of a single input, then it could
directly implement the BoundedInput interface. The possible
benefit to me is that we might be able to keep only one concept for marking the
end of stream, especially for the operators with only
one input.
Very thanks for all the deep insights and discussions!
Best,
Yun
------------------------------------------------------------------
From:Dawid Wysakowicz <[email protected]>
Send Time:2021 Jun. 3 (Thu.) 21:21
To:dev <[email protected]>; Till Rohrmann <[email protected]>; Yun Gao
<[email protected]>
Cc:Piotr Nowojski <[email protected]>; Guowei Ma <[email protected]>;
Stephan Ewen <[email protected]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all,
Thanks for the very insightful discussion. I'd like to revive the effort of
FLIP-147. First of all, from my side I'd like to say that I am really
interested in helping that happen in the upcoming 1.14 release.
I agree with Till that the final checkpoints and global commits are mostly
orthogonal. Similarly as Till, I'd suggest to first focus on the final
checkpoints, while just keeping in mind we should not make assumptions that
would make it impossible to implement the global commits. So far I do not see
such risk from the discussion.
Going back to the final checkpoints issue. I think the only outstanding issue
is which methods we want to use for flushing/closing both operators and UDFs
just before performing the final checkpoint. As pointed out to me by Piotr, I
am mentioning UDFs here as well, because we need a way for users using the
Public API to benefit from the final checkpoint (bear in mind that e.g.
TwoPhaseCommitSinkFunction which is implemented by our Kafka sink operates on
the UDF level). Right now RichFunction has no method which could be called just
before the final checkpoint that would say "flush" all intermediate state now
and prepare for the final checkpoint. I'd suggest introducing an additional
interface e.g. (name to be determined)
interface Flushable<T> {
void flush(Collector<T> out)
}
Additionally we would need to introduce a similar method on the StreamOperator
level. Currently we have two methods that are called at the end of operator
lifecycle:
close
dispose
The usage of the two methods is a bit confusing. Dispose is responsible for
closing all open resources and is supposed to be called in case of a failure.
On the other hand the close is a combination of a non-existent "flush" method
we need and dispose for closing resources in case of a successful run. I'd
suggest to clear it a bit. We would introduce a proper "flush" method which
would be called in case of a successful finishing of an operator. Moreover we
would make "close" deal only with closing any open resources, basically taking
over the role of the dispose, which we would deprecate.
Lastly, I'd like to say why I think it is better introduce a new "flush" method
instead of using the "endInput" method of BoundedOne/MultiInput. That method is
called per input, which means each operator would need to keep track of which
inputs were already closed internally and react differently if all of the
inputs were closed. With an explicit "flush" method we do not have such a
problem as the input bookkeeping happens on the StreamTask level.
Let me know what you think. I'd sync with Yun Gao and if there are no
objections we will extend the FLIP page with necessary changes.
Best,
Dawid