Hi Jing, That's a valid point but it brings up a discussion on the promotion of the Sink V2 API in general. I'll open a separate discussion thread for this.
Best regards, Martijn Op wo 18 jan. 2023 om 11:01 schreef Jing Ge <j...@ververica.com.invalid>: > Hi, > > I think it will be confusing for users that the older API is deprecated but > the related new API is not graduated yet which ends up with the awkward > situation that none proper API could be used in production. In our case, > e.g. the sinkv2.Sink should be marked as @public before the SinkFunction > can be marked as deprecated. Doing it in 1.17 might be too hasty. I am not > sure if the Flink community already has a similar rule. If not, a new one > should be defined. > > Best regards, > Jing > > On Wed, Jan 18, 2023 at 9:48 AM Martijn Visser <martijnvis...@apache.org> > wrote: > > > Hi Biao, > > > > Just to clarify, I understand your point of view and will not block your > > FLIP :) > > > > It would indeed be great to achieve that both the SinkFunction and > > SourceFunction will be marked as deprecated in 1.17 like Yun Tang pointed > > out. > > > > Best regards, > > > > Martijn > > > > Op wo 18 jan. 2023 om 09:34 schreef Yun Tang <myas...@live.com>: > > > > > Hi Biao, > > > > > > I think it's time to deprecate the SinkFunction and it would be fine if > > > you could drive to launch the discussion. > > > > > > BTW, we might make it done in flink-1.17 release with deprecating > > > SourceFunction[1] together. > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-28045 > > > > > > Best > > > Yun Tang > > > ________________________________ > > > From: Biao Liu <mmyy1...@gmail.com> > > > Sent: Wednesday, January 18, 2023 16:15 > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For > > > Batch Job > > > > > > Hi Martijn & Jing, > > > > > > Thanks for feedback! > > > > > > Currently, SinkFunction is in a subtle circumstance. Like Jing pointed > > out, > > > SinkFunction is still marked as public. Technically, according to the > > > Flink Bylaws[1], > > > the decision should be approved through an official voting. Although > many > > > of the community maintainers (including me) thought it should be > > > deprecated, we still should not assume it has been the fact. > Considering > > > the discussion and voting may last 1 or 2 weeks and it may last longer > > > if someone has an objection. I'd like to keep pushing the FLIP-281 > > forward > > > with current design. I hope it can catch up with the release of 1.17. > > > > > > By the way, if nobody drives the deprecating thing, I would like to > start > > > another discussion to talk about it. What do you think? > > > > > > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws > > > > > > Thanks, > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > On Fri, 13 Jan 2023 at 08:43, Jing Ge <j...@ververica.com.invalid> > > wrote: > > > > > > > Hi Biao, > > > > > > > > Thanks for driving this. Like Martijn already pointed out. We will > > spend > > > > effort to remove SinkFunction after we deprecate it. The more > > > > functionality added into it, the bigger effort we will have to > > deprecate > > > > and remove the SinkFunction. Commonly, It is not recommended to add > new > > > > features into an interface which we already decided to deprecate but > do > > > not > > > > do yet. But, this FLIP is a special case and there are some reasons > > that > > > > lead us to support this proposal. > > > > > > > > First, the FLIP offered an equivalent solution for the new SinkV2, > > which > > > > means the migration from SinkFunction to SinkV2 for this feature is > > > > predictable and acceptable. The concern I raised above has been > solved. > > > > > > > > Second, since the SinkFunction is still marked as public now [1], it > > > should > > > > be fine to add new features into it (follow the rules), especially if > > the > > > > requirement is urgent. Similar to [2] described for API graduation, > it > > > > should also take 8 months (two release cycles, ideal case is 8 > months, > > > > could be longer) to go from @Public to @Deprecated and to be removed. > > > > Additionally, considering the SinkFunction is one core function whose > > > > deletion will trigger a lot of further downstream deletions. The > > duration > > > > will be increased to be 16 months (again, idea case) or even longer, > > > e.g. 2 > > > > years. > > > > > > > > Third, the SinkV2 is still marked as @PublicEvolving, which means a > few > > > > more months (8 months?) in addition before we can start the > deprecation > > > of > > > > SinkFunction. It is not rational to say no features should be added > > into > > > > SinkFunction during the upcoming 2 or 3 years. > > > > > > > > After thinking about all these aspects, I would support this FLIP, so > > +1 > > > > > > > > This discussion leads us to another issue: we should graduate SinkV2 > > > > and deprecate and remove SinkFunction asap. The longer we keep > > > > the SinkFunktion in the code base, the bigger effort we will have > while > > > > working on anything that might depend on sink or has impact on sink. > > > > > > > > Best regards, > > > > Jing > > > > > > > > [1] > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java > > > > [2] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process > > > > > > > > On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser < > > martijnvis...@apache.org > > > > > > > > wrote: > > > > > > > > > Hi Biao, > > > > > > > > > > While I rather wouldn't add new features to (to-be) deprecated > > > features, > > > > I > > > > > would be +0 for this. > > > > > > > > > > Best regards, > > > > > > > > > > Martijn > > > > > > > > > > Op do 12 jan. 2023 om 08:42 schreef Biao Liu <mmyy1...@gmail.com>: > > > > > > > > > > > Hi Martijn, > > > > > > > > > > > > Thanks for your feedback! > > > > > > > > > > > > Yes, we propose to support speculative execution for > SinkFunction. > > > > > > 1. From the perspective of compatibility, SinkFunction is the > most > > > > > original > > > > > > Sink implementation.There are lots of implementations based on > > > > > > SinkFunction, not only in Flink official codebase but also in > > user's > > > > > > private codebase. It's a more serious issue than Sink V1. Of > course > > > we > > > > > hope > > > > > > users could migrate the legacy implementation to the new > interface. > > > > > However > > > > > > migration is always hard. > > > > > > 2. From the perspective of cost, we don't need to do much extra > > work > > > to > > > > > > support speculative execution for SinkFunction. All we need to do > > is > > > > > check > > > > > > whether the SinkFunction implementation > > > > > > inherits SupportsConcurrentExecutionAttempts or not. The other > > parts > > > of > > > > > > work are the same with Sink V2. > > > > > > > > > > > > To summarize, it's cheap to support speculative execution for > > > > > SinkFunction. > > > > > > And it may allow more existing scenarios to run with speculative > > > > > execution. > > > > > > > > > > > > Thanks, > > > > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser < > > > martijnvis...@apache.org > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Biao, > > > > > > > > > > > > > > Apologies for the late jumping in. My only question is about > > > > > > SinkFunction, > > > > > > > does this imply that we want to add support for this to the > > > > > SinkFunction? > > > > > > > If so, I would not be in favour of that since we would like to > > > > > deprecate > > > > > > (I > > > > > > > actually thought that was already the case) the SinkFunction in > > > > favour > > > > > of > > > > > > > SinkV2. > > > > > > > > > > > > > > Besides that, I have no other comments. > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > Martijn > > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang < > beyond1...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > Hi Biao, > > > > > > > > > > > > > > > > Thanks for explanation. > > > > > > > > > > > > > > > > +1 for the proposal. > > > > > > > > > > > > > > > > Best, > > > > > > > > Jing Zhang > > > > > > > > > > > > > > > > Lijie Wang <wangdachui9...@gmail.com> 于2023年1月4日周三 12:11写道: > > > > > > > > > > > > > > > > > Hi Biao, > > > > > > > > > > > > > > > > > > Thanks for the explanation of how SinkV2 knows the right > > > subtask > > > > > > > > > attempt. I have no more questions, +1 for the proposal. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Lijie > > > > > > > > > > > > > > > > > > Biao Liu <mmyy1...@gmail.com> 于2022年12月28日周三 17:22写道: > > > > > > > > > > > > > > > > > > > Thanks for all your feedback! > > > > > > > > > > > > > > > > > > > > To @Yuxia, > > > > > > > > > > > > > > > > > > > > > What the sink expect to do to isolate data produced by > > > > > > speculative > > > > > > > > > > > executions? IIUC, if the taks failover, it also > > generate a > > > > new > > > > > > > > > attempt. > > > > > > > > > > > Does it make difference in isolating data produced? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes there is something different from the task failover > > > > scenario. > > > > > > The > > > > > > > > > > attempt number is more necessary for speculative > execution > > > than > > > > > > > > failover. > > > > > > > > > > Because there can be only one subtask instance running at > > the > > > > > same > > > > > > > time > > > > > > > > > in > > > > > > > > > > the failover scenario. > > > > > > > > > > > > > > > > > > > > Let's take FileSystemOutputFormat as an example. For the > > > > failover > > > > > > > > > scenario, > > > > > > > > > > the temporary directory to store produced data can be > > > something > > > > > > like > > > > > > > > > > "$root_dir/task-$taskNumber/". At the initialization > phase, > > > > > subtask > > > > > > > > > deletes > > > > > > > > > > and re-creates the temporary directory. > > > > > > > > > > > > > > > > > > > > However in the speculative execution scenario, it does > not > > > work > > > > > > > because > > > > > > > > > > there might be several subtasks running at the same time. > > > These > > > > > > > > subtasks > > > > > > > > > > might delete, re-create and write the same temporary > > > directory > > > > at > > > > > > the > > > > > > > > > > same time. The correct temporary directory should be like > > > > > > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So > > it's > > > > > > > necessary > > > > > > > > to > > > > > > > > > > expose the attempt number to the Sink implementation to > do > > > the > > > > > data > > > > > > > > > > isolation. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > To @Lijie, > > > > > > > > > > > > > > > > > > > > > I have a question about this: does SinkV2 need to do > the > > > same > > > > > > > thing? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Actually, yes. > > > > > > > > > > > > > > > > > > > > Should we/users do it in the committer? If yes, how does > > the > > > > > > commiter > > > > > > > > > know > > > > > > > > > > > which one is the right subtask attempt? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, we/users should do it in the committer. > > > > > > > > > > > > > > > > > > > > In the current design, the Committer of Sink V2 should > get > > > the > > > > > > "which > > > > > > > > one > > > > > > > > > > is the right subtask attempt" information from the > > > "committable > > > > > > > data'' > > > > > > > > > > produced by SinkWriter. Let's take the FileSink as > example, > > > the > > > > > > > > > > "committable data" sent to the Committer contains the > full > > > path > > > > > of > > > > > > > the > > > > > > > > > > files produced by SinkWriter. Users could also pass the > > > attempt > > > > > > > number > > > > > > > > > > through "committable data" from SinkWriter to Committer. > > > > > > > > > > > > > > > > > > > > In the "Rejected Alternatives -> Introduce a way to clean > > > > leaked > > > > > > data > > > > > > > > of > > > > > > > > > > Sink V2" section of the FLIP document, we discussed some > of > > > the > > > > > > > reasons > > > > > > > > > > that we didn't provide the API like OutputFormat. > > > > > > > > > > > > > > > > > > > > To @Jing Zhang > > > > > > > > > > > > > > > > > > > > I have a question about this: Speculative execution of > > > > Committer > > > > > > will > > > > > > > > be > > > > > > > > > > > disabled. > > > > > > > > > > > > > > > > > > > > I agree with your point and I saw the similar > requirements > > to > > > > > > disable > > > > > > > > > > speculative > > > > > > > > > > > execution for specified operators. > > > > > > > > > > > > > > > > > > > > However the requirement is not supported currently. I > think > > > > there > > > > > > > > > > should be some > > > > > > > > > > > place to describe how to support it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In this FLIP design, the speculative execution of > Committer > > > of > > > > > Sink > > > > > > > V2 > > > > > > > > > will > > > > > > > > > > be disabled by Flink. It's not an optional operation. > Users > > > can > > > > > not > > > > > > > > > change > > > > > > > > > > it. > > > > > > > > > > And as you said, "disable speculative execution for > > specified > > > > > > > > operators" > > > > > > > > > is > > > > > > > > > > not supported in the FLIP. Because it's a bit out of > scope: > > > > "Sink > > > > > > > > > Supports > > > > > > > > > > Speculative Execution For Batch Job". I think it's better > > to > > > > > start > > > > > > > > > another > > > > > > > > > > FLIP to discuss it. "Fine-grained control of enabling > > > > speculative > > > > > > > > > execution > > > > > > > > > > for operators" can be the title of that FLIP. And we can > > > > discuss > > > > > > > there > > > > > > > > > how > > > > > > > > > > to enable or disable speculative execution for specified > > > > > operators > > > > > > > > > > including Committer and pre/post-committer of Sink V2. > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang < > > > beyond1...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Biao, > > > > > > > > > > > > > > > > > > > > > > Thanks for driving this FLIP. It's meaningful to > support > > > > > > > speculative > > > > > > > > > > > execution > > > > > > > > > > > of sinks is important. > > > > > > > > > > > > > > > > > > > > > > I have a question about this: Speculative execution of > > > > > Committer > > > > > > > will > > > > > > > > > be > > > > > > > > > > > disabled. > > > > > > > > > > > > > > > > > > > > > > I agree with your point and I saw the similar > > requirements > > > to > > > > > > > disable > > > > > > > > > > > speculative execution for specified operators. > > > > > > > > > > > > > > > > > > > > > > However the requirement is not supported currently. I > > think > > > > > there > > > > > > > > > should > > > > > > > > > > be > > > > > > > > > > > some place to describe how to support it. > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Jing Zhang > > > > > > > > > > > > > > > > > > > > > > Lijie Wang <wangdachui9...@gmail.com> 于2022年12月27日周二 > > > > 18:51写道: > > > > > > > > > > > > > > > > > > > > > > > Hi Biao, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for driving this FLIP. > > > > > > > > > > > > In this FLIP, it introduces "int > getFinishedAttempt(int > > > > > > > > > subtaskIndex)" > > > > > > > > > > > for > > > > > > > > > > > > OutputFormat to know which subtask attempt is the one > > > > marked > > > > > as > > > > > > > > > > finished > > > > > > > > > > > by > > > > > > > > > > > > JM and commit the right data. > > > > > > > > > > > > I have a question about this: does SinkV2 need to do > > the > > > > same > > > > > > > > thing? > > > > > > > > > > > Should > > > > > > > > > > > > we/users do it in the committer? If yes, how does the > > > > > commiter > > > > > > > know > > > > > > > > > > which > > > > > > > > > > > > one is the right subtask attempt? > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Lijie > > > > > > > > > > > > > > > > > > > > > > > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 > > > > 10:01写道: > > > > > > > > > > > > > > > > > > > > > > > > > HI, Biao. > > > > > > > > > > > > > Thanks for driving this FLIP. > > > > > > > > > > > > > After quick look of this FLIP, I have a question > > about > > > > > > "expose > > > > > > > > the > > > > > > > > > > > > attempt > > > > > > > > > > > > > number which can be used to isolate data produced > by > > > > > > > speculative > > > > > > > > > > > > executions > > > > > > > > > > > > > with the same subtask id". > > > > > > > > > > > > > What the sink expect to do to isolate data produced > > by > > > > > > > > speculative > > > > > > > > > > > > > executions? IIUC, if the taks failover, it also > > > > generate a > > > > > > new > > > > > > > > > > > attempt. > > > > > > > > > > > > > Does it make difference in isolating data produced? > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > Yuxia > > > > > > > > > > > > > > > > > > > > > > > > > > ----- 原始邮件 ----- > > > > > > > > > > > > > 发件人: "Biao Liu" <mmyy1...@gmail.com> > > > > > > > > > > > > > 收件人: "dev" <dev@flink.apache.org> > > > > > > > > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21 > > > > > > > > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative > > > > Execution > > > > > > For > > > > > > > > > Batch > > > > > > > > > > > Job > > > > > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > I would like to start a discussion about making > Sink > > > > > support > > > > > > > > > > > speculative > > > > > > > > > > > > > execution for batch jobs. This proposal is a follow > > up > > > of > > > > > > > > > "FLIP-168: > > > > > > > > > > > > > Speculative Execution For Batch Job"[1]. > Speculative > > > > > > execution > > > > > > > is > > > > > > > > > > very > > > > > > > > > > > > > meaningful for batch jobs. And it would be more > > > complete > > > > > > after > > > > > > > > > > > supporting > > > > > > > > > > > > > speculative execution of Sink. Please find more > > details > > > > in > > > > > > the > > > > > > > > FLIP > > > > > > > > > > > > > document > > > > > > > > > > > > > [2]. > > > > > > > > > > > > > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >