Hi Biao,

Thanks for driving this. Like Martijn already pointed out. We will spend
effort to remove SinkFunction after we deprecate it. The more
functionality added into it, the bigger effort we will have to deprecate
and remove the SinkFunction. Commonly, It is not recommended to add new
features into an interface which we already decided to deprecate but do not
do yet. But, this FLIP is a special case and there are some reasons that
lead us to support this proposal.

First, the FLIP offered an equivalent solution for the new SinkV2, which
means the migration from SinkFunction to SinkV2 for this feature is
predictable and acceptable. The concern I raised above has been solved.

Second, since the SinkFunction is still marked as public now [1], it should
be fine to add new features into it (follow the rules), especially if the
requirement is urgent. Similar to [2] described for API graduation, it
should also take 8 months (two release cycles, ideal case is 8 months,
could be longer) to go from @Public to @Deprecated and to be removed.
Additionally, considering the SinkFunction is one core function whose
deletion will trigger a lot of further downstream deletions. The duration
will be increased to be 16 months (again, idea case) or even longer, e.g. 2
years.

Third, the SinkV2 is still marked as @PublicEvolving, which means a few
more months (8 months?) in addition before we can start the deprecation of
SinkFunction. It is not rational to say no features should be added into
SinkFunction during the upcoming 2 or 3 years.

After thinking about all these aspects, I would support this FLIP, so +1

This discussion leads us to another issue: we should graduate SinkV2
and deprecate and remove SinkFunction asap. The longer we keep
the SinkFunktion in the code base, the bigger effort we will have while
working on anything that might depend on sink or has impact on sink.

Best regards,
Jing

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process

On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Biao,
>
> While I rather wouldn't add new features to (to-be) deprecated features, I
> would be +0 for this.
>
> Best regards,
>
> Martijn
>
> Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mmyy1...@gmail.com>:
>
> > Hi Martijn,
> >
> > Thanks for your feedback!
> >
> > Yes, we propose to support speculative execution for SinkFunction.
> > 1. From the perspective of compatibility, SinkFunction is the most
> original
> > Sink implementation.There are lots of implementations based on
> > SinkFunction, not only in Flink official codebase but also in user's
> > private codebase. It's a more serious issue than Sink V1. Of course we
> hope
> > users could migrate the legacy implementation to the new interface.
> However
> > migration is always hard.
> > 2. From the perspective of cost, we don't need to do much extra work to
> > support speculative execution for SinkFunction. All we need to do is
> check
> > whether the SinkFunction implementation
> > inherits SupportsConcurrentExecutionAttempts or not. The other parts of
> > work are the same with Sink V2.
> >
> > To summarize, it's cheap to support speculative execution for
> SinkFunction.
> > And it may allow more existing scenarios to run with speculative
> execution.
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <martijnvis...@apache.org>
> > wrote:
> >
> > > Hi Biao,
> > >
> > > Apologies for the late jumping in. My only question is about
> > SinkFunction,
> > > does this imply that we want to add support for this to the
> SinkFunction?
> > > If so, I would not be in favour of that since we would like to
> deprecate
> > (I
> > > actually thought that was already the case) the SinkFunction in favour
> of
> > > SinkV2.
> > >
> > > Besides that, I have no other comments.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <beyond1...@gmail.com>
> wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > Thanks for explanation.
> > > >
> > > > +1 for the proposal.
> > > >
> > > > Best,
> > > > Jing Zhang
> > > >
> > > > Lijie Wang <wangdachui9...@gmail.com> 于2023年1月4日周三 12:11写道:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > Thanks for the explanation of how SinkV2  knows the right subtask
> > > > > attempt. I have no more questions, +1 for the proposal.
> > > > >
> > > > > Best,
> > > > > Lijie
> > > > >
> > > > > Biao Liu <mmyy1...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > > >
> > > > > > Thanks for all your feedback!
> > > > > >
> > > > > > To @Yuxia,
> > > > > >
> > > > > > > What the sink expect to do to isolate data produced by
> > speculative
> > > > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > > > attempt.
> > > > > > > Does it make difference in isolating data produced?
> > > > > >
> > > > > >
> > > > > > Yes there is something different from the task failover scenario.
> > The
> > > > > > attempt number is more necessary for speculative execution than
> > > > failover.
> > > > > > Because there can be only one subtask instance running at the
> same
> > > time
> > > > > in
> > > > > > the failover scenario.
> > > > > >
> > > > > > Let's take FileSystemOutputFormat as an example. For the failover
> > > > > scenario,
> > > > > > the temporary directory to store produced data can be something
> > like
> > > > > > "$root_dir/task-$taskNumber/". At the initialization phase,
> subtask
> > > > > deletes
> > > > > > and re-creates the temporary directory.
> > > > > >
> > > > > > However in the speculative execution scenario, it does not work
> > > because
> > > > > > there might be several subtasks running at the same time. These
> > > > subtasks
> > > > > > might delete, re-create and write the same temporary directory at
> > the
> > > > > > same time. The correct temporary directory should be like
> > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> > > necessary
> > > > to
> > > > > > expose the attempt number to the Sink implementation to do the
> data
> > > > > > isolation.
> > > > > >
> > > > > >
> > > > > > To @Lijie,
> > > > > >
> > > > > > > I have a question about this: does SinkV2 need to do the same
> > > thing?
> > > > > >
> > > > > >
> > > > > > Actually, yes.
> > > > > >
> > > > > > Should we/users do it in the committer? If yes, how does the
> > commiter
> > > > > know
> > > > > > > which one is the right subtask attempt?
> > > > > >
> > > > > >
> > > > > > Yes, we/users should do it in the committer.
> > > > > >
> > > > > > In the current design, the Committer of Sink V2 should get the
> > "which
> > > > one
> > > > > > is the right subtask attempt" information from the "committable
> > > data''
> > > > > > produced by SinkWriter. Let's take the FileSink as example, the
> > > > > > "committable data" sent to the Committer contains the full path
> of
> > > the
> > > > > > files produced by SinkWriter. Users could also pass the attempt
> > > number
> > > > > > through "committable data" from SinkWriter to Committer.
> > > > > >
> > > > > > In the "Rejected Alternatives -> Introduce a way to clean leaked
> > data
> > > > of
> > > > > > Sink V2" section of the FLIP document, we discussed some of the
> > > reasons
> > > > > > that we didn't provide the API like OutputFormat.
> > > > > >
> > > > > > To @Jing Zhang
> > > > > >
> > > > > > I have a question about this: Speculative execution of Committer
> > will
> > > > be
> > > > > > > disabled.
> > > > > >
> > > > > > I agree with your point and I saw the similar requirements to
> > disable
> > > > > > speculative
> > > > > > > execution for specified operators.
> > > > > >
> > > > > > However the requirement is not supported currently. I think there
> > > > > > should be some
> > > > > > > place to describe how to support it.
> > > > > >
> > > > > >
> > > > > > In this FLIP design, the speculative execution of Committer of
> Sink
> > > V2
> > > > > will
> > > > > > be disabled by Flink. It's not an optional operation. Users can
> not
> > > > > change
> > > > > > it.
> > > > > > And as you said, "disable speculative execution for specified
> > > > operators"
> > > > > is
> > > > > > not supported in the FLIP. Because it's a bit out of scope: "Sink
> > > > > Supports
> > > > > > Speculative Execution For Batch Job". I think it's better to
> start
> > > > > another
> > > > > > FLIP to discuss it. "Fine-grained control of enabling speculative
> > > > > execution
> > > > > > for operators" can be the title of that FLIP. And we can discuss
> > > there
> > > > > how
> > > > > > to enable or disable speculative execution for specified
> operators
> > > > > > including Committer and pre/post-committer of Sink V2.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Thanks,
> > > > > > Biao /'bɪ.aʊ/
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <beyond1...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Biao,
> > > > > > >
> > > > > > > Thanks for driving this FLIP. It's meaningful to support
> > > speculative
> > > > > > > execution
> > > > > > > of sinks is important.
> > > > > > >
> > > > > > > I have a question about this: Speculative execution of
> Committer
> > > will
> > > > > be
> > > > > > > disabled.
> > > > > > >
> > > > > > > I agree with your point and I saw the similar requirements to
> > > disable
> > > > > > > speculative execution for specified operators.
> > > > > > >
> > > > > > > However the requirement is not supported currently. I think
> there
> > > > > should
> > > > > > be
> > > > > > > some place to describe how to support it.
> > > > > > >
> > > > > > > Best,
> > > > > > > Jing Zhang
> > > > > > >
> > > > > > > Lijie Wang <wangdachui9...@gmail.com> 于2022年12月27日周二 18:51写道:
> > > > > > >
> > > > > > > > Hi Biao,
> > > > > > > >
> > > > > > > > Thanks for driving this FLIP.
> > > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > > > subtaskIndex)"
> > > > > > > for
> > > > > > > > OutputFormat to know which subtask attempt is the one marked
> as
> > > > > > finished
> > > > > > > by
> > > > > > > > JM and commit the right data.
> > > > > > > > I have a question about this: does SinkV2 need to do the same
> > > > thing?
> > > > > > > Should
> > > > > > > > we/users do it in the committer? If yes, how does the
> commiter
> > > know
> > > > > > which
> > > > > > > > one is the right subtask attempt?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Lijie
> > > > > > > >
> > > > > > > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > > > > > > >
> > > > > > > > > HI, Biao.
> > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > After quick look of this FLIP, I have a question about
> > "expose
> > > > the
> > > > > > > > attempt
> > > > > > > > > number which can be used to isolate data produced by
> > > speculative
> > > > > > > > executions
> > > > > > > > > with the same subtask id".
> > > > > > > > > What the sink expect to do to isolate data produced by
> > > > speculative
> > > > > > > > > executions?  IIUC, if the taks failover, it also generate a
> > new
> > > > > > > attempt.
> > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > > Yuxia
> > > > > > > > >
> > > > > > > > > ----- 原始邮件 -----
> > > > > > > > > 发件人: "Biao Liu" <mmyy1...@gmail.com>
> > > > > > > > > 收件人: "dev" <dev@flink.apache.org>
> > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution
> > For
> > > > > Batch
> > > > > > > Job
> > > > > > > > >
> > > > > > > > > Hi everyone,
> > > > > > > > >
> > > > > > > > > I would like to start a discussion about making Sink
> support
> > > > > > > speculative
> > > > > > > > > execution for batch jobs. This proposal is a follow up of
> > > > > "FLIP-168:
> > > > > > > > > Speculative Execution For Batch Job"[1]. Speculative
> > execution
> > > is
> > > > > > very
> > > > > > > > > meaningful for batch jobs. And it would be more complete
> > after
> > > > > > > supporting
> > > > > > > > > speculative execution of Sink. Please find more details in
> > the
> > > > FLIP
> > > > > > > > > document
> > > > > > > > > [2].
> > > > > > > > >
> > > > > > > > > Looking forward to your feedback.
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > > [2]
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to