Hi Martijn & Jing,

Thanks for feedback!

Currently, SinkFunction is in a subtle circumstance. Like Jing pointed out,
SinkFunction is still marked as public. Technically, according to the
Flink Bylaws[1],
the decision should be approved through an official voting. Although many
of the community maintainers (including me) thought it should be
deprecated, we still should not assume it has been the fact. Considering
the discussion and voting may last 1 or 2 weeks and it may last longer
if someone has an objection. I'd like to keep pushing the FLIP-281 forward
with current design. I hope it can catch up with the release of 1.17.

By the way, if nobody drives the deprecating thing, I would like to start
another discussion to talk about it. What do you think?

[1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws

Thanks,
Biao /'bɪ.aʊ/



On Fri, 13 Jan 2023 at 08:43, Jing Ge <j...@ververica.com.invalid> wrote:

> Hi Biao,
>
> Thanks for driving this. Like Martijn already pointed out. We will spend
> effort to remove SinkFunction after we deprecate it. The more
> functionality added into it, the bigger effort we will have to deprecate
> and remove the SinkFunction. Commonly, It is not recommended to add new
> features into an interface which we already decided to deprecate but do not
> do yet. But, this FLIP is a special case and there are some reasons that
> lead us to support this proposal.
>
> First, the FLIP offered an equivalent solution for the new SinkV2, which
> means the migration from SinkFunction to SinkV2 for this feature is
> predictable and acceptable. The concern I raised above has been solved.
>
> Second, since the SinkFunction is still marked as public now [1], it should
> be fine to add new features into it (follow the rules), especially if the
> requirement is urgent. Similar to [2] described for API graduation, it
> should also take 8 months (two release cycles, ideal case is 8 months,
> could be longer) to go from @Public to @Deprecated and to be removed.
> Additionally, considering the SinkFunction is one core function whose
> deletion will trigger a lot of further downstream deletions. The duration
> will be increased to be 16 months (again, idea case) or even longer, e.g. 2
> years.
>
> Third, the SinkV2 is still marked as @PublicEvolving, which means a few
> more months (8 months?) in addition before we can start the deprecation of
> SinkFunction. It is not rational to say no features should be added into
> SinkFunction during the upcoming 2 or 3 years.
>
> After thinking about all these aspects, I would support this FLIP, so +1
>
> This discussion leads us to another issue: we should graduate SinkV2
> and deprecate and remove SinkFunction asap. The longer we keep
> the SinkFunktion in the code base, the bigger effort we will have while
> working on anything that might depend on sink or has impact on sink.
>
> Best regards,
> Jing
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
>
> On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
> > Hi Biao,
> >
> > While I rather wouldn't add new features to (to-be) deprecated features,
> I
> > would be +0 for this.
> >
> > Best regards,
> >
> > Martijn
> >
> > Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mmyy1...@gmail.com>:
> >
> > > Hi Martijn,
> > >
> > > Thanks for your feedback!
> > >
> > > Yes, we propose to support speculative execution for SinkFunction.
> > > 1. From the perspective of compatibility, SinkFunction is the most
> > original
> > > Sink implementation.There are lots of implementations based on
> > > SinkFunction, not only in Flink official codebase but also in user's
> > > private codebase. It's a more serious issue than Sink V1. Of course we
> > hope
> > > users could migrate the legacy implementation to the new interface.
> > However
> > > migration is always hard.
> > > 2. From the perspective of cost, we don't need to do much extra work to
> > > support speculative execution for SinkFunction. All we need to do is
> > check
> > > whether the SinkFunction implementation
> > > inherits SupportsConcurrentExecutionAttempts or not. The other parts of
> > > work are the same with Sink V2.
> > >
> > > To summarize, it's cheap to support speculative execution for
> > SinkFunction.
> > > And it may allow more existing scenarios to run with speculative
> > execution.
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> > >
> > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <martijnvis...@apache.org
> >
> > > wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > Apologies for the late jumping in. My only question is about
> > > SinkFunction,
> > > > does this imply that we want to add support for this to the
> > SinkFunction?
> > > > If so, I would not be in favour of that since we would like to
> > deprecate
> > > (I
> > > > actually thought that was already the case) the SinkFunction in
> favour
> > of
> > > > SinkV2.
> > > >
> > > > Besides that, I have no other comments.
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <beyond1...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > Thanks for explanation.
> > > > >
> > > > > +1 for the proposal.
> > > > >
> > > > > Best,
> > > > > Jing Zhang
> > > > >
> > > > > Lijie Wang <wangdachui9...@gmail.com> 于2023年1月4日周三 12:11写道:
> > > > >
> > > > > > Hi Biao,
> > > > > >
> > > > > > Thanks for the explanation of how SinkV2  knows the right subtask
> > > > > > attempt. I have no more questions, +1 for the proposal.
> > > > > >
> > > > > > Best,
> > > > > > Lijie
> > > > > >
> > > > > > Biao Liu <mmyy1...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > > > >
> > > > > > > Thanks for all your feedback!
> > > > > > >
> > > > > > > To @Yuxia,
> > > > > > >
> > > > > > > > What the sink expect to do to isolate data produced by
> > > speculative
> > > > > > > > executions?  IIUC, if the taks failover, it also generate a
> new
> > > > > > attempt.
> > > > > > > > Does it make difference in isolating data produced?
> > > > > > >
> > > > > > >
> > > > > > > Yes there is something different from the task failover
> scenario.
> > > The
> > > > > > > attempt number is more necessary for speculative execution than
> > > > > failover.
> > > > > > > Because there can be only one subtask instance running at the
> > same
> > > > time
> > > > > > in
> > > > > > > the failover scenario.
> > > > > > >
> > > > > > > Let's take FileSystemOutputFormat as an example. For the
> failover
> > > > > > scenario,
> > > > > > > the temporary directory to store produced data can be something
> > > like
> > > > > > > "$root_dir/task-$taskNumber/". At the initialization phase,
> > subtask
> > > > > > deletes
> > > > > > > and re-creates the temporary directory.
> > > > > > >
> > > > > > > However in the speculative execution scenario, it does not work
> > > > because
> > > > > > > there might be several subtasks running at the same time. These
> > > > > subtasks
> > > > > > > might delete, re-create and write the same temporary directory
> at
> > > the
> > > > > > > same time. The correct temporary directory should be like
> > > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> > > > necessary
> > > > > to
> > > > > > > expose the attempt number to the Sink implementation to do the
> > data
> > > > > > > isolation.
> > > > > > >
> > > > > > >
> > > > > > > To @Lijie,
> > > > > > >
> > > > > > > > I have a question about this: does SinkV2 need to do the same
> > > > thing?
> > > > > > >
> > > > > > >
> > > > > > > Actually, yes.
> > > > > > >
> > > > > > > Should we/users do it in the committer? If yes, how does the
> > > commiter
> > > > > > know
> > > > > > > > which one is the right subtask attempt?
> > > > > > >
> > > > > > >
> > > > > > > Yes, we/users should do it in the committer.
> > > > > > >
> > > > > > > In the current design, the Committer of Sink V2 should get the
> > > "which
> > > > > one
> > > > > > > is the right subtask attempt" information from the "committable
> > > > data''
> > > > > > > produced by SinkWriter. Let's take the FileSink as example, the
> > > > > > > "committable data" sent to the Committer contains the full path
> > of
> > > > the
> > > > > > > files produced by SinkWriter. Users could also pass the attempt
> > > > number
> > > > > > > through "committable data" from SinkWriter to Committer.
> > > > > > >
> > > > > > > In the "Rejected Alternatives -> Introduce a way to clean
> leaked
> > > data
> > > > > of
> > > > > > > Sink V2" section of the FLIP document, we discussed some of the
> > > > reasons
> > > > > > > that we didn't provide the API like OutputFormat.
> > > > > > >
> > > > > > > To @Jing Zhang
> > > > > > >
> > > > > > > I have a question about this: Speculative execution of
> Committer
> > > will
> > > > > be
> > > > > > > > disabled.
> > > > > > >
> > > > > > > I agree with your point and I saw the similar requirements to
> > > disable
> > > > > > > speculative
> > > > > > > > execution for specified operators.
> > > > > > >
> > > > > > > However the requirement is not supported currently. I think
> there
> > > > > > > should be some
> > > > > > > > place to describe how to support it.
> > > > > > >
> > > > > > >
> > > > > > > In this FLIP design, the speculative execution of Committer of
> > Sink
> > > > V2
> > > > > > will
> > > > > > > be disabled by Flink. It's not an optional operation. Users can
> > not
> > > > > > change
> > > > > > > it.
> > > > > > > And as you said, "disable speculative execution for specified
> > > > > operators"
> > > > > > is
> > > > > > > not supported in the FLIP. Because it's a bit out of scope:
> "Sink
> > > > > > Supports
> > > > > > > Speculative Execution For Batch Job". I think it's better to
> > start
> > > > > > another
> > > > > > > FLIP to discuss it. "Fine-grained control of enabling
> speculative
> > > > > > execution
> > > > > > > for operators" can be the title of that FLIP. And we can
> discuss
> > > > there
> > > > > > how
> > > > > > > to enable or disable speculative execution for specified
> > operators
> > > > > > > including Committer and pre/post-committer of Sink V2.
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Biao /'bɪ.aʊ/
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <beyond1...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Biao,
> > > > > > > >
> > > > > > > > Thanks for driving this FLIP. It's meaningful to support
> > > > speculative
> > > > > > > > execution
> > > > > > > > of sinks is important.
> > > > > > > >
> > > > > > > > I have a question about this: Speculative execution of
> > Committer
> > > > will
> > > > > > be
> > > > > > > > disabled.
> > > > > > > >
> > > > > > > > I agree with your point and I saw the similar requirements to
> > > > disable
> > > > > > > > speculative execution for specified operators.
> > > > > > > >
> > > > > > > > However the requirement is not supported currently. I think
> > there
> > > > > > should
> > > > > > > be
> > > > > > > > some place to describe how to support it.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jing Zhang
> > > > > > > >
> > > > > > > > Lijie Wang <wangdachui9...@gmail.com> 于2022年12月27日周二
> 18:51写道:
> > > > > > > >
> > > > > > > > > Hi Biao,
> > > > > > > > >
> > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > > > > subtaskIndex)"
> > > > > > > > for
> > > > > > > > > OutputFormat to know which subtask attempt is the one
> marked
> > as
> > > > > > > finished
> > > > > > > > by
> > > > > > > > > JM and commit the right data.
> > > > > > > > > I have a question about this: does SinkV2 need to do the
> same
> > > > > thing?
> > > > > > > > Should
> > > > > > > > > we/users do it in the committer? If yes, how does the
> > commiter
> > > > know
> > > > > > > which
> > > > > > > > > one is the right subtask attempt?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Lijie
> > > > > > > > >
> > > > > > > > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年12月27日周二
> 10:01写道:
> > > > > > > > >
> > > > > > > > > > HI, Biao.
> > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > After quick look of this FLIP, I have a question about
> > > "expose
> > > > > the
> > > > > > > > > attempt
> > > > > > > > > > number which can be used to isolate data produced by
> > > > speculative
> > > > > > > > > executions
> > > > > > > > > > with the same subtask id".
> > > > > > > > > > What the sink expect to do to isolate data produced by
> > > > > speculative
> > > > > > > > > > executions?  IIUC, if the taks failover, it also
> generate a
> > > new
> > > > > > > > attempt.
> > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > > Yuxia
> > > > > > > > > >
> > > > > > > > > > ----- 原始邮件 -----
> > > > > > > > > > 发件人: "Biao Liu" <mmyy1...@gmail.com>
> > > > > > > > > > 收件人: "dev" <dev@flink.apache.org>
> > > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative
> Execution
> > > For
> > > > > > Batch
> > > > > > > > Job
> > > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > I would like to start a discussion about making Sink
> > support
> > > > > > > > speculative
> > > > > > > > > > execution for batch jobs. This proposal is a follow up of
> > > > > > "FLIP-168:
> > > > > > > > > > Speculative Execution For Batch Job"[1]. Speculative
> > > execution
> > > > is
> > > > > > > very
> > > > > > > > > > meaningful for batch jobs. And it would be more complete
> > > after
> > > > > > > > supporting
> > > > > > > > > > speculative execution of Sink. Please find more details
> in
> > > the
> > > > > FLIP
> > > > > > > > > > document
> > > > > > > > > > [2].
> > > > > > > > > >
> > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > > > [2]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to