Hi Jing,

That's a valid point but it brings up a discussion on the promotion of the
Sink V2 API in general. I'll open a separate discussion thread for this.

Best regards,

Martijn



Op wo 18 jan. 2023 om 11:01 schreef Jing Ge <j...@ververica.com.invalid>:

> Hi,
>
> I think it will be confusing for users that the older API is deprecated but
> the related new API is not graduated yet which ends up with the awkward
> situation that none proper API could be used in production. In our case,
> e.g. the sinkv2.Sink should be marked as @public before the SinkFunction
> can be marked as deprecated. Doing it in 1.17 might be too hasty. I am not
> sure if the Flink community already has a similar rule. If not, a new one
> should be defined.
>
> Best regards,
> Jing
>
> On Wed, Jan 18, 2023 at 9:48 AM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
> > Hi Biao,
> >
> > Just to clarify, I understand your point of view and will not block your
> > FLIP :)
> >
> > It would indeed be great to achieve that both the SinkFunction and
> > SourceFunction will be marked as deprecated in 1.17 like Yun Tang pointed
> > out.
> >
> > Best regards,
> >
> > Martijn
> >
> > Op wo 18 jan. 2023 om 09:34 schreef Yun Tang <myas...@live.com>:
> >
> > > Hi Biao,
> > >
> > > I think it's time to deprecate the SinkFunction and it would be fine if
> > > you could drive to launch the discussion.
> > >
> > > BTW, we might make it done in flink-1.17 release with deprecating
> > > SourceFunction[1] together.
> > >
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-28045
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Biao Liu <mmyy1...@gmail.com>
> > > Sent: Wednesday, January 18, 2023 16:15
> > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For
> > > Batch Job
> > >
> > > Hi Martijn & Jing,
> > >
> > > Thanks for feedback!
> > >
> > > Currently, SinkFunction is in a subtle circumstance. Like Jing pointed
> > out,
> > > SinkFunction is still marked as public. Technically, according to the
> > > Flink Bylaws[1],
> > > the decision should be approved through an official voting. Although
> many
> > > of the community maintainers (including me) thought it should be
> > > deprecated, we still should not assume it has been the fact.
> Considering
> > > the discussion and voting may last 1 or 2 weeks and it may last longer
> > > if someone has an objection. I'd like to keep pushing the FLIP-281
> > forward
> > > with current design. I hope it can catch up with the release of 1.17.
> > >
> > > By the way, if nobody drives the deprecating thing, I would like to
> start
> > > another discussion to talk about it. What do you think?
> > >
> > > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> > >
> > > On Fri, 13 Jan 2023 at 08:43, Jing Ge <j...@ververica.com.invalid>
> > wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > Thanks for driving this. Like Martijn already pointed out. We will
> > spend
> > > > effort to remove SinkFunction after we deprecate it. The more
> > > > functionality added into it, the bigger effort we will have to
> > deprecate
> > > > and remove the SinkFunction. Commonly, It is not recommended to add
> new
> > > > features into an interface which we already decided to deprecate but
> do
> > > not
> > > > do yet. But, this FLIP is a special case and there are some reasons
> > that
> > > > lead us to support this proposal.
> > > >
> > > > First, the FLIP offered an equivalent solution for the new SinkV2,
> > which
> > > > means the migration from SinkFunction to SinkV2 for this feature is
> > > > predictable and acceptable. The concern I raised above has been
> solved.
> > > >
> > > > Second, since the SinkFunction is still marked as public now [1], it
> > > should
> > > > be fine to add new features into it (follow the rules), especially if
> > the
> > > > requirement is urgent. Similar to [2] described for API graduation,
> it
> > > > should also take 8 months (two release cycles, ideal case is 8
> months,
> > > > could be longer) to go from @Public to @Deprecated and to be removed.
> > > > Additionally, considering the SinkFunction is one core function whose
> > > > deletion will trigger a lot of further downstream deletions. The
> > duration
> > > > will be increased to be 16 months (again, idea case) or even longer,
> > > e.g. 2
> > > > years.
> > > >
> > > > Third, the SinkV2 is still marked as @PublicEvolving, which means a
> few
> > > > more months (8 months?) in addition before we can start the
> deprecation
> > > of
> > > > SinkFunction. It is not rational to say no features should be added
> > into
> > > > SinkFunction during the upcoming 2 or 3 years.
> > > >
> > > > After thinking about all these aspects, I would support this FLIP, so
> > +1
> > > >
> > > > This discussion leads us to another issue: we should graduate SinkV2
> > > > and deprecate and remove SinkFunction asap. The longer we keep
> > > > the SinkFunktion in the code base, the bigger effort we will have
> while
> > > > working on anything that might depend on sink or has impact on sink.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
> > > > [2]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
> > > >
> > > > On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser <
> > martijnvis...@apache.org
> > > >
> > > > wrote:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > While I rather wouldn't add new features to (to-be) deprecated
> > > features,
> > > > I
> > > > > would be +0 for this.
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Martijn
> > > > >
> > > > > Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mmyy1...@gmail.com>:
> > > > >
> > > > > > Hi Martijn,
> > > > > >
> > > > > > Thanks for your feedback!
> > > > > >
> > > > > > Yes, we propose to support speculative execution for
> SinkFunction.
> > > > > > 1. From the perspective of compatibility, SinkFunction is the
> most
> > > > > original
> > > > > > Sink implementation.There are lots of implementations based on
> > > > > > SinkFunction, not only in Flink official codebase but also in
> > user's
> > > > > > private codebase. It's a more serious issue than Sink V1. Of
> course
> > > we
> > > > > hope
> > > > > > users could migrate the legacy implementation to the new
> interface.
> > > > > However
> > > > > > migration is always hard.
> > > > > > 2. From the perspective of cost, we don't need to do much extra
> > work
> > > to
> > > > > > support speculative execution for SinkFunction. All we need to do
> > is
> > > > > check
> > > > > > whether the SinkFunction implementation
> > > > > > inherits SupportsConcurrentExecutionAttempts or not. The other
> > parts
> > > of
> > > > > > work are the same with Sink V2.
> > > > > >
> > > > > > To summarize, it's cheap to support speculative execution for
> > > > > SinkFunction.
> > > > > > And it may allow more existing scenarios to run with speculative
> > > > > execution.
> > > > > >
> > > > > > Thanks,
> > > > > > Biao /'bɪ.aʊ/
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser <
> > > martijnvis...@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Biao,
> > > > > > >
> > > > > > > Apologies for the late jumping in. My only question is about
> > > > > > SinkFunction,
> > > > > > > does this imply that we want to add support for this to the
> > > > > SinkFunction?
> > > > > > > If so, I would not be in favour of that since we would like to
> > > > > deprecate
> > > > > > (I
> > > > > > > actually thought that was already the case) the SinkFunction in
> > > > favour
> > > > > of
> > > > > > > SinkV2.
> > > > > > >
> > > > > > > Besides that, I have no other comments.
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Martijn
> > > > > > >
> > > > > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <
> beyond1...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Biao,
> > > > > > > >
> > > > > > > > Thanks for explanation.
> > > > > > > >
> > > > > > > > +1 for the proposal.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jing Zhang
> > > > > > > >
> > > > > > > > Lijie Wang <wangdachui9...@gmail.com> 于2023年1月4日周三 12:11写道:
> > > > > > > >
> > > > > > > > > Hi Biao,
> > > > > > > > >
> > > > > > > > > Thanks for the explanation of how SinkV2  knows the right
> > > subtask
> > > > > > > > > attempt. I have no more questions, +1 for the proposal.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Lijie
> > > > > > > > >
> > > > > > > > > Biao Liu <mmyy1...@gmail.com> 于2022年12月28日周三 17:22写道:
> > > > > > > > >
> > > > > > > > > > Thanks for all your feedback!
> > > > > > > > > >
> > > > > > > > > > To @Yuxia,
> > > > > > > > > >
> > > > > > > > > > > What the sink expect to do to isolate data produced by
> > > > > > speculative
> > > > > > > > > > > executions?  IIUC, if the taks failover, it also
> > generate a
> > > > new
> > > > > > > > > attempt.
> > > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yes there is something different from the task failover
> > > > scenario.
> > > > > > The
> > > > > > > > > > attempt number is more necessary for speculative
> execution
> > > than
> > > > > > > > failover.
> > > > > > > > > > Because there can be only one subtask instance running at
> > the
> > > > > same
> > > > > > > time
> > > > > > > > > in
> > > > > > > > > > the failover scenario.
> > > > > > > > > >
> > > > > > > > > > Let's take FileSystemOutputFormat as an example. For the
> > > > failover
> > > > > > > > > scenario,
> > > > > > > > > > the temporary directory to store produced data can be
> > > something
> > > > > > like
> > > > > > > > > > "$root_dir/task-$taskNumber/". At the initialization
> phase,
> > > > > subtask
> > > > > > > > > deletes
> > > > > > > > > > and re-creates the temporary directory.
> > > > > > > > > >
> > > > > > > > > > However in the speculative execution scenario, it does
> not
> > > work
> > > > > > > because
> > > > > > > > > > there might be several subtasks running at the same time.
> > > These
> > > > > > > > subtasks
> > > > > > > > > > might delete, re-create and write the same temporary
> > > directory
> > > > at
> > > > > > the
> > > > > > > > > > same time. The correct temporary directory should be like
> > > > > > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So
> > it's
> > > > > > > necessary
> > > > > > > > to
> > > > > > > > > > expose the attempt number to the Sink implementation to
> do
> > > the
> > > > > data
> > > > > > > > > > isolation.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > To @Lijie,
> > > > > > > > > >
> > > > > > > > > > > I have a question about this: does SinkV2 need to do
> the
> > > same
> > > > > > > thing?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Actually, yes.
> > > > > > > > > >
> > > > > > > > > > Should we/users do it in the committer? If yes, how does
> > the
> > > > > > commiter
> > > > > > > > > know
> > > > > > > > > > > which one is the right subtask attempt?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yes, we/users should do it in the committer.
> > > > > > > > > >
> > > > > > > > > > In the current design, the Committer of Sink V2 should
> get
> > > the
> > > > > > "which
> > > > > > > > one
> > > > > > > > > > is the right subtask attempt" information from the
> > > "committable
> > > > > > > data''
> > > > > > > > > > produced by SinkWriter. Let's take the FileSink as
> example,
> > > the
> > > > > > > > > > "committable data" sent to the Committer contains the
> full
> > > path
> > > > > of
> > > > > > > the
> > > > > > > > > > files produced by SinkWriter. Users could also pass the
> > > attempt
> > > > > > > number
> > > > > > > > > > through "committable data" from SinkWriter to Committer.
> > > > > > > > > >
> > > > > > > > > > In the "Rejected Alternatives -> Introduce a way to clean
> > > > leaked
> > > > > > data
> > > > > > > > of
> > > > > > > > > > Sink V2" section of the FLIP document, we discussed some
> of
> > > the
> > > > > > > reasons
> > > > > > > > > > that we didn't provide the API like OutputFormat.
> > > > > > > > > >
> > > > > > > > > > To @Jing Zhang
> > > > > > > > > >
> > > > > > > > > > I have a question about this: Speculative execution of
> > > > Committer
> > > > > > will
> > > > > > > > be
> > > > > > > > > > > disabled.
> > > > > > > > > >
> > > > > > > > > > I agree with your point and I saw the similar
> requirements
> > to
> > > > > > disable
> > > > > > > > > > speculative
> > > > > > > > > > > execution for specified operators.
> > > > > > > > > >
> > > > > > > > > > However the requirement is not supported currently. I
> think
> > > > there
> > > > > > > > > > should be some
> > > > > > > > > > > place to describe how to support it.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > In this FLIP design, the speculative execution of
> Committer
> > > of
> > > > > Sink
> > > > > > > V2
> > > > > > > > > will
> > > > > > > > > > be disabled by Flink. It's not an optional operation.
> Users
> > > can
> > > > > not
> > > > > > > > > change
> > > > > > > > > > it.
> > > > > > > > > > And as you said, "disable speculative execution for
> > specified
> > > > > > > > operators"
> > > > > > > > > is
> > > > > > > > > > not supported in the FLIP. Because it's a bit out of
> scope:
> > > > "Sink
> > > > > > > > > Supports
> > > > > > > > > > Speculative Execution For Batch Job". I think it's better
> > to
> > > > > start
> > > > > > > > > another
> > > > > > > > > > FLIP to discuss it. "Fine-grained control of enabling
> > > > speculative
> > > > > > > > > execution
> > > > > > > > > > for operators" can be the title of that FLIP. And we can
> > > > discuss
> > > > > > > there
> > > > > > > > > how
> > > > > > > > > > to enable or disable speculative execution for specified
> > > > > operators
> > > > > > > > > > including Committer and pre/post-committer of Sink V2.
> > > > > > > > > >
> > > > > > > > > > What do you think?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <
> > > beyond1...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Biao,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for driving this FLIP. It's meaningful to
> support
> > > > > > > speculative
> > > > > > > > > > > execution
> > > > > > > > > > > of sinks is important.
> > > > > > > > > > >
> > > > > > > > > > > I have a question about this: Speculative execution of
> > > > > Committer
> > > > > > > will
> > > > > > > > > be
> > > > > > > > > > > disabled.
> > > > > > > > > > >
> > > > > > > > > > > I agree with your point and I saw the similar
> > requirements
> > > to
> > > > > > > disable
> > > > > > > > > > > speculative execution for specified operators.
> > > > > > > > > > >
> > > > > > > > > > > However the requirement is not supported currently. I
> > think
> > > > > there
> > > > > > > > > should
> > > > > > > > > > be
> > > > > > > > > > > some place to describe how to support it.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Jing Zhang
> > > > > > > > > > >
> > > > > > > > > > > Lijie Wang <wangdachui9...@gmail.com> 于2022年12月27日周二
> > > > 18:51写道:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Biao,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > > > In this FLIP, it introduces "int
> getFinishedAttempt(int
> > > > > > > > > subtaskIndex)"
> > > > > > > > > > > for
> > > > > > > > > > > > OutputFormat to know which subtask attempt is the one
> > > > marked
> > > > > as
> > > > > > > > > > finished
> > > > > > > > > > > by
> > > > > > > > > > > > JM and commit the right data.
> > > > > > > > > > > > I have a question about this: does SinkV2 need to do
> > the
> > > > same
> > > > > > > > thing?
> > > > > > > > > > > Should
> > > > > > > > > > > > we/users do it in the committer? If yes, how does the
> > > > > commiter
> > > > > > > know
> > > > > > > > > > which
> > > > > > > > > > > > one is the right subtask attempt?
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Lijie
> > > > > > > > > > > >
> > > > > > > > > > > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年12月27日周二
> > > > 10:01写道:
> > > > > > > > > > > >
> > > > > > > > > > > > > HI, Biao.
> > > > > > > > > > > > > Thanks for driving this FLIP.
> > > > > > > > > > > > > After quick look of this FLIP, I have a question
> > about
> > > > > > "expose
> > > > > > > > the
> > > > > > > > > > > > attempt
> > > > > > > > > > > > > number which can be used to isolate data produced
> by
> > > > > > > speculative
> > > > > > > > > > > > executions
> > > > > > > > > > > > > with the same subtask id".
> > > > > > > > > > > > > What the sink expect to do to isolate data produced
> > by
> > > > > > > > speculative
> > > > > > > > > > > > > executions?  IIUC, if the taks failover, it also
> > > > generate a
> > > > > > new
> > > > > > > > > > > attempt.
> > > > > > > > > > > > > Does it make difference in isolating data produced?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > Yuxia
> > > > > > > > > > > > >
> > > > > > > > > > > > > ----- 原始邮件 -----
> > > > > > > > > > > > > 发件人: "Biao Liu" <mmyy1...@gmail.com>
> > > > > > > > > > > > > 收件人: "dev" <dev@flink.apache.org>
> > > > > > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative
> > > > Execution
> > > > > > For
> > > > > > > > > Batch
> > > > > > > > > > > Job
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I would like to start a discussion about making
> Sink
> > > > > support
> > > > > > > > > > > speculative
> > > > > > > > > > > > > execution for batch jobs. This proposal is a follow
> > up
> > > of
> > > > > > > > > "FLIP-168:
> > > > > > > > > > > > > Speculative Execution For Batch Job"[1].
> Speculative
> > > > > > execution
> > > > > > > is
> > > > > > > > > > very
> > > > > > > > > > > > > meaningful for batch jobs. And it would be more
> > > complete
> > > > > > after
> > > > > > > > > > > supporting
> > > > > > > > > > > > > speculative execution of Sink. Please find more
> > details
> > > > in
> > > > > > the
> > > > > > > > FLIP
> > > > > > > > > > > > > document
> > > > > > > > > > > > > [2].
> > > > > > > > > > > > >
> > > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > > [1]
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > > > > > > > [2]
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Biao /'bɪ.aʊ/
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to