Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Jing, That's a valid point but it brings up a discussion on the promotion of the Sink V2 API in general. I'll open a separate discussion thread for this. Best regards, Martijn Op wo 18 jan. 2023 om 11:01 schreef Jing Ge : > Hi, > > I think it will be confusing for users that the older API is deprecated but > the related new API is not graduated yet which ends up with the awkward > situation that none proper API could be used in production. In our case, > e.g. the sinkv2.Sink should be marked as @public before the SinkFunction > can be marked as deprecated. Doing it in 1.17 might be too hasty. I am not > sure if the Flink community already has a similar rule. If not, a new one > should be defined. > > Best regards, > Jing > > On Wed, Jan 18, 2023 at 9:48 AM Martijn Visser > wrote: > > > Hi Biao, > > > > Just to clarify, I understand your point of view and will not block your > > FLIP :) > > > > It would indeed be great to achieve that both the SinkFunction and > > SourceFunction will be marked as deprecated in 1.17 like Yun Tang pointed > > out. > > > > Best regards, > > > > Martijn > > > > Op wo 18 jan. 2023 om 09:34 schreef Yun Tang : > > > > > Hi Biao, > > > > > > I think it's time to deprecate the SinkFunction and it would be fine if > > > you could drive to launch the discussion. > > > > > > BTW, we might make it done in flink-1.17 release with deprecating > > > SourceFunction[1] together. > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-28045 > > > > > > Best > > > Yun Tang > > > > > > From: Biao Liu > > > Sent: Wednesday, January 18, 2023 16:15 > > > To: dev@flink.apache.org > > > Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For > > > Batch Job > > > > > > Hi Martijn & Jing, > > > > > > Thanks for feedback! > > > > > > Currently, SinkFunction is in a subtle circumstance. Like Jing pointed > > out, > > > SinkFunction is still marked as public. Technically, according to the > > > Flink Bylaws[1], > > > the decision should be approved through an official voting. Although > many > > > of the community maintainers (including me) thought it should be > > > deprecated, we still should not assume it has been the fact. > Considering > > > the discussion and voting may last 1 or 2 weeks and it may last longer > > > if someone has an objection. I'd like to keep pushing the FLIP-281 > > forward > > > with current design. I hope it can catch up with the release of 1.17. > > > > > > By the way, if nobody drives the deprecating thing, I would like to > start > > > another discussion to talk about it. What do you think? > > > > > > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws > > > > > > Thanks, > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > On Fri, 13 Jan 2023 at 08:43, Jing Ge > > wrote: > > > > > > > Hi Biao, > > > > > > > > Thanks for driving this. Like Martijn already pointed out. We will > > spend > > > > effort to remove SinkFunction after we deprecate it. The more > > > > functionality added into it, the bigger effort we will have to > > deprecate > > > > and remove the SinkFunction. Commonly, It is not recommended to add > new > > > > features into an interface which we already decided to deprecate but > do > > > not > > > > do yet. But, this FLIP is a special case and there are some reasons > > that > > > > lead us to support this proposal. > > > > > > > > First, the FLIP offered an equivalent solution for the new SinkV2, > > which > > > > means the migration from SinkFunction to SinkV2 for this feature is > > > > predictable and acceptable. The concern I raised above has been > solved. > > > > > > > > Second, since the SinkFunction is still marked as public now [1], it > > > should > > > > be fine to add new features into it (follow the rules), especially if > > the > > > > requirement is urgent. Similar to [2] described for API graduation, > it > > > > should also take 8 months (two release cycles, ideal case is 8 > months, > > > > could be longer) to go from @Public to @Deprecated and to be removed. > > > > Additionally, cons
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi, I think it will be confusing for users that the older API is deprecated but the related new API is not graduated yet which ends up with the awkward situation that none proper API could be used in production. In our case, e.g. the sinkv2.Sink should be marked as @public before the SinkFunction can be marked as deprecated. Doing it in 1.17 might be too hasty. I am not sure if the Flink community already has a similar rule. If not, a new one should be defined. Best regards, Jing On Wed, Jan 18, 2023 at 9:48 AM Martijn Visser wrote: > Hi Biao, > > Just to clarify, I understand your point of view and will not block your > FLIP :) > > It would indeed be great to achieve that both the SinkFunction and > SourceFunction will be marked as deprecated in 1.17 like Yun Tang pointed > out. > > Best regards, > > Martijn > > Op wo 18 jan. 2023 om 09:34 schreef Yun Tang : > > > Hi Biao, > > > > I think it's time to deprecate the SinkFunction and it would be fine if > > you could drive to launch the discussion. > > > > BTW, we might make it done in flink-1.17 release with deprecating > > SourceFunction[1] together. > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-28045 > > > > Best > > Yun Tang > > ________________ > > From: Biao Liu > > Sent: Wednesday, January 18, 2023 16:15 > > To: dev@flink.apache.org > > Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For > > Batch Job > > > > Hi Martijn & Jing, > > > > Thanks for feedback! > > > > Currently, SinkFunction is in a subtle circumstance. Like Jing pointed > out, > > SinkFunction is still marked as public. Technically, according to the > > Flink Bylaws[1], > > the decision should be approved through an official voting. Although many > > of the community maintainers (including me) thought it should be > > deprecated, we still should not assume it has been the fact. Considering > > the discussion and voting may last 1 or 2 weeks and it may last longer > > if someone has an objection. I'd like to keep pushing the FLIP-281 > forward > > with current design. I hope it can catch up with the release of 1.17. > > > > By the way, if nobody drives the deprecating thing, I would like to start > > another discussion to talk about it. What do you think? > > > > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws > > > > Thanks, > > Biao /'bɪ.aʊ/ > > > > > > > > On Fri, 13 Jan 2023 at 08:43, Jing Ge > wrote: > > > > > Hi Biao, > > > > > > Thanks for driving this. Like Martijn already pointed out. We will > spend > > > effort to remove SinkFunction after we deprecate it. The more > > > functionality added into it, the bigger effort we will have to > deprecate > > > and remove the SinkFunction. Commonly, It is not recommended to add new > > > features into an interface which we already decided to deprecate but do > > not > > > do yet. But, this FLIP is a special case and there are some reasons > that > > > lead us to support this proposal. > > > > > > First, the FLIP offered an equivalent solution for the new SinkV2, > which > > > means the migration from SinkFunction to SinkV2 for this feature is > > > predictable and acceptable. The concern I raised above has been solved. > > > > > > Second, since the SinkFunction is still marked as public now [1], it > > should > > > be fine to add new features into it (follow the rules), especially if > the > > > requirement is urgent. Similar to [2] described for API graduation, it > > > should also take 8 months (two release cycles, ideal case is 8 months, > > > could be longer) to go from @Public to @Deprecated and to be removed. > > > Additionally, considering the SinkFunction is one core function whose > > > deletion will trigger a lot of further downstream deletions. The > duration > > > will be increased to be 16 months (again, idea case) or even longer, > > e.g. 2 > > > years. > > > > > > Third, the SinkV2 is still marked as @PublicEvolving, which means a few > > > more months (8 months?) in addition before we can start the deprecation > > of > > > SinkFunction. It is not rational to say no features should be added > into > > > SinkFunction during the upcoming 2 or 3 years. > > > > > > After thinking about all these aspects, I would support this FLIP, so > +1 > > > > > > This discussion leads us to ano
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, Just to clarify, I understand your point of view and will not block your FLIP :) It would indeed be great to achieve that both the SinkFunction and SourceFunction will be marked as deprecated in 1.17 like Yun Tang pointed out. Best regards, Martijn Op wo 18 jan. 2023 om 09:34 schreef Yun Tang : > Hi Biao, > > I think it's time to deprecate the SinkFunction and it would be fine if > you could drive to launch the discussion. > > BTW, we might make it done in flink-1.17 release with deprecating > SourceFunction[1] together. > > > [1] https://issues.apache.org/jira/browse/FLINK-28045 > > Best > Yun Tang > > From: Biao Liu > Sent: Wednesday, January 18, 2023 16:15 > To: dev@flink.apache.org > Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For > Batch Job > > Hi Martijn & Jing, > > Thanks for feedback! > > Currently, SinkFunction is in a subtle circumstance. Like Jing pointed out, > SinkFunction is still marked as public. Technically, according to the > Flink Bylaws[1], > the decision should be approved through an official voting. Although many > of the community maintainers (including me) thought it should be > deprecated, we still should not assume it has been the fact. Considering > the discussion and voting may last 1 or 2 weeks and it may last longer > if someone has an objection. I'd like to keep pushing the FLIP-281 forward > with current design. I hope it can catch up with the release of 1.17. > > By the way, if nobody drives the deprecating thing, I would like to start > another discussion to talk about it. What do you think? > > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Fri, 13 Jan 2023 at 08:43, Jing Ge wrote: > > > Hi Biao, > > > > Thanks for driving this. Like Martijn already pointed out. We will spend > > effort to remove SinkFunction after we deprecate it. The more > > functionality added into it, the bigger effort we will have to deprecate > > and remove the SinkFunction. Commonly, It is not recommended to add new > > features into an interface which we already decided to deprecate but do > not > > do yet. But, this FLIP is a special case and there are some reasons that > > lead us to support this proposal. > > > > First, the FLIP offered an equivalent solution for the new SinkV2, which > > means the migration from SinkFunction to SinkV2 for this feature is > > predictable and acceptable. The concern I raised above has been solved. > > > > Second, since the SinkFunction is still marked as public now [1], it > should > > be fine to add new features into it (follow the rules), especially if the > > requirement is urgent. Similar to [2] described for API graduation, it > > should also take 8 months (two release cycles, ideal case is 8 months, > > could be longer) to go from @Public to @Deprecated and to be removed. > > Additionally, considering the SinkFunction is one core function whose > > deletion will trigger a lot of further downstream deletions. The duration > > will be increased to be 16 months (again, idea case) or even longer, > e.g. 2 > > years. > > > > Third, the SinkV2 is still marked as @PublicEvolving, which means a few > > more months (8 months?) in addition before we can start the deprecation > of > > SinkFunction. It is not rational to say no features should be added into > > SinkFunction during the upcoming 2 or 3 years. > > > > After thinking about all these aspects, I would support this FLIP, so +1 > > > > This discussion leads us to another issue: we should graduate SinkV2 > > and deprecate and remove SinkFunction asap. The longer we keep > > the SinkFunktion in the code base, the bigger effort we will have while > > working on anything that might depend on sink or has impact on sink. > > > > Best regards, > > Jing > > > > [1] > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process > > > > On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser > > > wrote: > > > > > Hi Biao, > > > > > > While I rather wouldn't add new features to (to-be) deprecated > features, > > I > > > would be +0 for this. > > > > > > Best regards, > > > > > > Martijn > > > > > > Op do 12 jan. 2023 om 08:42 schreef Biao Liu :
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, I think it's time to deprecate the SinkFunction and it would be fine if you could drive to launch the discussion. BTW, we might make it done in flink-1.17 release with deprecating SourceFunction[1] together. [1] https://issues.apache.org/jira/browse/FLINK-28045 Best Yun Tang From: Biao Liu Sent: Wednesday, January 18, 2023 16:15 To: dev@flink.apache.org Subject: Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job Hi Martijn & Jing, Thanks for feedback! Currently, SinkFunction is in a subtle circumstance. Like Jing pointed out, SinkFunction is still marked as public. Technically, according to the Flink Bylaws[1], the decision should be approved through an official voting. Although many of the community maintainers (including me) thought it should be deprecated, we still should not assume it has been the fact. Considering the discussion and voting may last 1 or 2 weeks and it may last longer if someone has an objection. I'd like to keep pushing the FLIP-281 forward with current design. I hope it can catch up with the release of 1.17. By the way, if nobody drives the deprecating thing, I would like to start another discussion to talk about it. What do you think? [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws Thanks, Biao /'bɪ.aʊ/ On Fri, 13 Jan 2023 at 08:43, Jing Ge wrote: > Hi Biao, > > Thanks for driving this. Like Martijn already pointed out. We will spend > effort to remove SinkFunction after we deprecate it. The more > functionality added into it, the bigger effort we will have to deprecate > and remove the SinkFunction. Commonly, It is not recommended to add new > features into an interface which we already decided to deprecate but do not > do yet. But, this FLIP is a special case and there are some reasons that > lead us to support this proposal. > > First, the FLIP offered an equivalent solution for the new SinkV2, which > means the migration from SinkFunction to SinkV2 for this feature is > predictable and acceptable. The concern I raised above has been solved. > > Second, since the SinkFunction is still marked as public now [1], it should > be fine to add new features into it (follow the rules), especially if the > requirement is urgent. Similar to [2] described for API graduation, it > should also take 8 months (two release cycles, ideal case is 8 months, > could be longer) to go from @Public to @Deprecated and to be removed. > Additionally, considering the SinkFunction is one core function whose > deletion will trigger a lot of further downstream deletions. The duration > will be increased to be 16 months (again, idea case) or even longer, e.g. 2 > years. > > Third, the SinkV2 is still marked as @PublicEvolving, which means a few > more months (8 months?) in addition before we can start the deprecation of > SinkFunction. It is not rational to say no features should be added into > SinkFunction during the upcoming 2 or 3 years. > > After thinking about all these aspects, I would support this FLIP, so +1 > > This discussion leads us to another issue: we should graduate SinkV2 > and deprecate and remove SinkFunction asap. The longer we keep > the SinkFunktion in the code base, the bigger effort we will have while > working on anything that might depend on sink or has impact on sink. > > Best regards, > Jing > > [1] > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process > > On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser > wrote: > > > Hi Biao, > > > > While I rather wouldn't add new features to (to-be) deprecated features, > I > > would be +0 for this. > > > > Best regards, > > > > Martijn > > > > Op do 12 jan. 2023 om 08:42 schreef Biao Liu : > > > > > Hi Martijn, > > > > > > Thanks for your feedback! > > > > > > Yes, we propose to support speculative execution for SinkFunction. > > > 1. From the perspective of compatibility, SinkFunction is the most > > original > > > Sink implementation.There are lots of implementations based on > > > SinkFunction, not only in Flink official codebase but also in user's > > > private codebase. It's a more serious issue than Sink V1. Of course we > > hope > > > users could migrate the legacy implementation to the new interface. > > However > > > migration is always hard. > > > 2. From the perspective of cost, we don't need to do much extra work to > > > support speculative execution for SinkFunction. All we need to
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Martijn & Jing, Thanks for feedback! Currently, SinkFunction is in a subtle circumstance. Like Jing pointed out, SinkFunction is still marked as public. Technically, according to the Flink Bylaws[1], the decision should be approved through an official voting. Although many of the community maintainers (including me) thought it should be deprecated, we still should not assume it has been the fact. Considering the discussion and voting may last 1 or 2 weeks and it may last longer if someone has an objection. I'd like to keep pushing the FLIP-281 forward with current design. I hope it can catch up with the release of 1.17. By the way, if nobody drives the deprecating thing, I would like to start another discussion to talk about it. What do you think? [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws Thanks, Biao /'bɪ.aʊ/ On Fri, 13 Jan 2023 at 08:43, Jing Ge wrote: > Hi Biao, > > Thanks for driving this. Like Martijn already pointed out. We will spend > effort to remove SinkFunction after we deprecate it. The more > functionality added into it, the bigger effort we will have to deprecate > and remove the SinkFunction. Commonly, It is not recommended to add new > features into an interface which we already decided to deprecate but do not > do yet. But, this FLIP is a special case and there are some reasons that > lead us to support this proposal. > > First, the FLIP offered an equivalent solution for the new SinkV2, which > means the migration from SinkFunction to SinkV2 for this feature is > predictable and acceptable. The concern I raised above has been solved. > > Second, since the SinkFunction is still marked as public now [1], it should > be fine to add new features into it (follow the rules), especially if the > requirement is urgent. Similar to [2] described for API graduation, it > should also take 8 months (two release cycles, ideal case is 8 months, > could be longer) to go from @Public to @Deprecated and to be removed. > Additionally, considering the SinkFunction is one core function whose > deletion will trigger a lot of further downstream deletions. The duration > will be increased to be 16 months (again, idea case) or even longer, e.g. 2 > years. > > Third, the SinkV2 is still marked as @PublicEvolving, which means a few > more months (8 months?) in addition before we can start the deprecation of > SinkFunction. It is not rational to say no features should be added into > SinkFunction during the upcoming 2 or 3 years. > > After thinking about all these aspects, I would support this FLIP, so +1 > > This discussion leads us to another issue: we should graduate SinkV2 > and deprecate and remove SinkFunction asap. The longer we keep > the SinkFunktion in the code base, the bigger effort we will have while > working on anything that might depend on sink or has impact on sink. > > Best regards, > Jing > > [1] > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process > > On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser > wrote: > > > Hi Biao, > > > > While I rather wouldn't add new features to (to-be) deprecated features, > I > > would be +0 for this. > > > > Best regards, > > > > Martijn > > > > Op do 12 jan. 2023 om 08:42 schreef Biao Liu : > > > > > Hi Martijn, > > > > > > Thanks for your feedback! > > > > > > Yes, we propose to support speculative execution for SinkFunction. > > > 1. From the perspective of compatibility, SinkFunction is the most > > original > > > Sink implementation.There are lots of implementations based on > > > SinkFunction, not only in Flink official codebase but also in user's > > > private codebase. It's a more serious issue than Sink V1. Of course we > > hope > > > users could migrate the legacy implementation to the new interface. > > However > > > migration is always hard. > > > 2. From the perspective of cost, we don't need to do much extra work to > > > support speculative execution for SinkFunction. All we need to do is > > check > > > whether the SinkFunction implementation > > > inherits SupportsConcurrentExecutionAttempts or not. The other parts of > > > work are the same with Sink V2. > > > > > > To summarize, it's cheap to support speculative execution for > > SinkFunction. > > > And it may allow more existing scenarios to run with speculative > > execution. > > > > > > Thanks, > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser > > > > wrote: > > > > > > > Hi Biao, > > > > > > > > Apologies for the late jumping in. My only question is about > > > SinkFunction, > > > > does this imply that we want to add support for this to the > > SinkFunction? > > > > If so, I would not be in favour of that since we would like to > > deprecate > > > (I > > > > actually thought that was already the case) the SinkFunction
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, Thanks for driving this. Like Martijn already pointed out. We will spend effort to remove SinkFunction after we deprecate it. The more functionality added into it, the bigger effort we will have to deprecate and remove the SinkFunction. Commonly, It is not recommended to add new features into an interface which we already decided to deprecate but do not do yet. But, this FLIP is a special case and there are some reasons that lead us to support this proposal. First, the FLIP offered an equivalent solution for the new SinkV2, which means the migration from SinkFunction to SinkV2 for this feature is predictable and acceptable. The concern I raised above has been solved. Second, since the SinkFunction is still marked as public now [1], it should be fine to add new features into it (follow the rules), especially if the requirement is urgent. Similar to [2] described for API graduation, it should also take 8 months (two release cycles, ideal case is 8 months, could be longer) to go from @Public to @Deprecated and to be removed. Additionally, considering the SinkFunction is one core function whose deletion will trigger a lot of further downstream deletions. The duration will be increased to be 16 months (again, idea case) or even longer, e.g. 2 years. Third, the SinkV2 is still marked as @PublicEvolving, which means a few more months (8 months?) in addition before we can start the deprecation of SinkFunction. It is not rational to say no features should be added into SinkFunction during the upcoming 2 or 3 years. After thinking about all these aspects, I would support this FLIP, so +1 This discussion leads us to another issue: we should graduate SinkV2 and deprecate and remove SinkFunction asap. The longer we keep the SinkFunktion in the code base, the bigger effort we will have while working on anything that might depend on sink or has impact on sink. Best regards, Jing [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process On Thu, Jan 12, 2023 at 8:56 AM Martijn Visser wrote: > Hi Biao, > > While I rather wouldn't add new features to (to-be) deprecated features, I > would be +0 for this. > > Best regards, > > Martijn > > Op do 12 jan. 2023 om 08:42 schreef Biao Liu : > > > Hi Martijn, > > > > Thanks for your feedback! > > > > Yes, we propose to support speculative execution for SinkFunction. > > 1. From the perspective of compatibility, SinkFunction is the most > original > > Sink implementation.There are lots of implementations based on > > SinkFunction, not only in Flink official codebase but also in user's > > private codebase. It's a more serious issue than Sink V1. Of course we > hope > > users could migrate the legacy implementation to the new interface. > However > > migration is always hard. > > 2. From the perspective of cost, we don't need to do much extra work to > > support speculative execution for SinkFunction. All we need to do is > check > > whether the SinkFunction implementation > > inherits SupportsConcurrentExecutionAttempts or not. The other parts of > > work are the same with Sink V2. > > > > To summarize, it's cheap to support speculative execution for > SinkFunction. > > And it may allow more existing scenarios to run with speculative > execution. > > > > Thanks, > > Biao /'bɪ.aʊ/ > > > > > > > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser > > wrote: > > > > > Hi Biao, > > > > > > Apologies for the late jumping in. My only question is about > > SinkFunction, > > > does this imply that we want to add support for this to the > SinkFunction? > > > If so, I would not be in favour of that since we would like to > deprecate > > (I > > > actually thought that was already the case) the SinkFunction in favour > of > > > SinkV2. > > > > > > Besides that, I have no other comments. > > > > > > Best regards, > > > > > > Martijn > > > > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang > wrote: > > > > > > > Hi Biao, > > > > > > > > Thanks for explanation. > > > > > > > > +1 for the proposal. > > > > > > > > Best, > > > > Jing Zhang > > > > > > > > Lijie Wang 于2023年1月4日周三 12:11写道: > > > > > > > > > Hi Biao, > > > > > > > > > > Thanks for the explanation of how SinkV2 knows the right subtask > > > > > attempt. I have no more questions, +1 for the proposal. > > > > > > > > > > Best, > > > > > Lijie > > > > > > > > > > Biao Liu 于2022年12月28日周三 17:22写道: > > > > > > > > > > > Thanks for all your feedback! > > > > > > > > > > > > To @Yuxia, > > > > > > > > > > > > > What the sink expect to do to isolate data produced by > > speculative > > > > > > > executions? IIUC, if the taks failover, it also generate a new > > > > > attempt. > > > > > > > Does it make difference in isolating data produced? > > > > > > > > > > > > > > > > > > Yes there is something different from the task failover scenario. >
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, While I rather wouldn't add new features to (to-be) deprecated features, I would be +0 for this. Best regards, Martijn Op do 12 jan. 2023 om 08:42 schreef Biao Liu : > Hi Martijn, > > Thanks for your feedback! > > Yes, we propose to support speculative execution for SinkFunction. > 1. From the perspective of compatibility, SinkFunction is the most original > Sink implementation.There are lots of implementations based on > SinkFunction, not only in Flink official codebase but also in user's > private codebase. It's a more serious issue than Sink V1. Of course we hope > users could migrate the legacy implementation to the new interface. However > migration is always hard. > 2. From the perspective of cost, we don't need to do much extra work to > support speculative execution for SinkFunction. All we need to do is check > whether the SinkFunction implementation > inherits SupportsConcurrentExecutionAttempts or not. The other parts of > work are the same with Sink V2. > > To summarize, it's cheap to support speculative execution for SinkFunction. > And it may allow more existing scenarios to run with speculative execution. > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Wed, 11 Jan 2023 at 21:22, Martijn Visser > wrote: > > > Hi Biao, > > > > Apologies for the late jumping in. My only question is about > SinkFunction, > > does this imply that we want to add support for this to the SinkFunction? > > If so, I would not be in favour of that since we would like to deprecate > (I > > actually thought that was already the case) the SinkFunction in favour of > > SinkV2. > > > > Besides that, I have no other comments. > > > > Best regards, > > > > Martijn > > > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang wrote: > > > > > Hi Biao, > > > > > > Thanks for explanation. > > > > > > +1 for the proposal. > > > > > > Best, > > > Jing Zhang > > > > > > Lijie Wang 于2023年1月4日周三 12:11写道: > > > > > > > Hi Biao, > > > > > > > > Thanks for the explanation of how SinkV2 knows the right subtask > > > > attempt. I have no more questions, +1 for the proposal. > > > > > > > > Best, > > > > Lijie > > > > > > > > Biao Liu 于2022年12月28日周三 17:22写道: > > > > > > > > > Thanks for all your feedback! > > > > > > > > > > To @Yuxia, > > > > > > > > > > > What the sink expect to do to isolate data produced by > speculative > > > > > > executions? IIUC, if the taks failover, it also generate a new > > > > attempt. > > > > > > Does it make difference in isolating data produced? > > > > > > > > > > > > > > > Yes there is something different from the task failover scenario. > The > > > > > attempt number is more necessary for speculative execution than > > > failover. > > > > > Because there can be only one subtask instance running at the same > > time > > > > in > > > > > the failover scenario. > > > > > > > > > > Let's take FileSystemOutputFormat as an example. For the failover > > > > scenario, > > > > > the temporary directory to store produced data can be something > like > > > > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask > > > > deletes > > > > > and re-creates the temporary directory. > > > > > > > > > > However in the speculative execution scenario, it does not work > > because > > > > > there might be several subtasks running at the same time. These > > > subtasks > > > > > might delete, re-create and write the same temporary directory at > the > > > > > same time. The correct temporary directory should be like > > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's > > necessary > > > to > > > > > expose the attempt number to the Sink implementation to do the data > > > > > isolation. > > > > > > > > > > > > > > > To @Lijie, > > > > > > > > > > > I have a question about this: does SinkV2 need to do the same > > thing? > > > > > > > > > > > > > > > Actually, yes. > > > > > > > > > > Should we/users do it in the committer? If yes, how does the > commiter > > > > know > > > > > > which one is the right subtask attempt? > > > > > > > > > > > > > > > Yes, we/users should do it in the committer. > > > > > > > > > > In the current design, the Committer of Sink V2 should get the > "which > > > one > > > > > is the right subtask attempt" information from the "committable > > data'' > > > > > produced by SinkWriter. Let's take the FileSink as example, the > > > > > "committable data" sent to the Committer contains the full path of > > the > > > > > files produced by SinkWriter. Users could also pass the attempt > > number > > > > > through "committable data" from SinkWriter to Committer. > > > > > > > > > > In the "Rejected Alternatives -> Introduce a way to clean leaked > data > > > of > > > > > Sink V2" section of the FLIP document, we discussed some of the > > reasons > > > > > that we didn't provide the API like OutputFormat. > > > > > > > > > > To @Jing Zhang > > > > > > > > > > I have a question about this: Speculative execution of Committer > will > > > be > > > > > > disabled. > > > > > > > >
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Martijn, Thanks for your feedback! Yes, we propose to support speculative execution for SinkFunction. 1. From the perspective of compatibility, SinkFunction is the most original Sink implementation.There are lots of implementations based on SinkFunction, not only in Flink official codebase but also in user's private codebase. It's a more serious issue than Sink V1. Of course we hope users could migrate the legacy implementation to the new interface. However migration is always hard. 2. From the perspective of cost, we don't need to do much extra work to support speculative execution for SinkFunction. All we need to do is check whether the SinkFunction implementation inherits SupportsConcurrentExecutionAttempts or not. The other parts of work are the same with Sink V2. To summarize, it's cheap to support speculative execution for SinkFunction. And it may allow more existing scenarios to run with speculative execution. Thanks, Biao /'bɪ.aʊ/ On Wed, 11 Jan 2023 at 21:22, Martijn Visser wrote: > Hi Biao, > > Apologies for the late jumping in. My only question is about SinkFunction, > does this imply that we want to add support for this to the SinkFunction? > If so, I would not be in favour of that since we would like to deprecate (I > actually thought that was already the case) the SinkFunction in favour of > SinkV2. > > Besides that, I have no other comments. > > Best regards, > > Martijn > > On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang wrote: > > > Hi Biao, > > > > Thanks for explanation. > > > > +1 for the proposal. > > > > Best, > > Jing Zhang > > > > Lijie Wang 于2023年1月4日周三 12:11写道: > > > > > Hi Biao, > > > > > > Thanks for the explanation of how SinkV2 knows the right subtask > > > attempt. I have no more questions, +1 for the proposal. > > > > > > Best, > > > Lijie > > > > > > Biao Liu 于2022年12月28日周三 17:22写道: > > > > > > > Thanks for all your feedback! > > > > > > > > To @Yuxia, > > > > > > > > > What the sink expect to do to isolate data produced by speculative > > > > > executions? IIUC, if the taks failover, it also generate a new > > > attempt. > > > > > Does it make difference in isolating data produced? > > > > > > > > > > > > Yes there is something different from the task failover scenario. The > > > > attempt number is more necessary for speculative execution than > > failover. > > > > Because there can be only one subtask instance running at the same > time > > > in > > > > the failover scenario. > > > > > > > > Let's take FileSystemOutputFormat as an example. For the failover > > > scenario, > > > > the temporary directory to store produced data can be something like > > > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask > > > deletes > > > > and re-creates the temporary directory. > > > > > > > > However in the speculative execution scenario, it does not work > because > > > > there might be several subtasks running at the same time. These > > subtasks > > > > might delete, re-create and write the same temporary directory at the > > > > same time. The correct temporary directory should be like > > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's > necessary > > to > > > > expose the attempt number to the Sink implementation to do the data > > > > isolation. > > > > > > > > > > > > To @Lijie, > > > > > > > > > I have a question about this: does SinkV2 need to do the same > thing? > > > > > > > > > > > > Actually, yes. > > > > > > > > Should we/users do it in the committer? If yes, how does the commiter > > > know > > > > > which one is the right subtask attempt? > > > > > > > > > > > > Yes, we/users should do it in the committer. > > > > > > > > In the current design, the Committer of Sink V2 should get the "which > > one > > > > is the right subtask attempt" information from the "committable > data'' > > > > produced by SinkWriter. Let's take the FileSink as example, the > > > > "committable data" sent to the Committer contains the full path of > the > > > > files produced by SinkWriter. Users could also pass the attempt > number > > > > through "committable data" from SinkWriter to Committer. > > > > > > > > In the "Rejected Alternatives -> Introduce a way to clean leaked data > > of > > > > Sink V2" section of the FLIP document, we discussed some of the > reasons > > > > that we didn't provide the API like OutputFormat. > > > > > > > > To @Jing Zhang > > > > > > > > I have a question about this: Speculative execution of Committer will > > be > > > > > disabled. > > > > > > > > I agree with your point and I saw the similar requirements to disable > > > > speculative > > > > > execution for specified operators. > > > > > > > > However the requirement is not supported currently. I think there > > > > should be some > > > > > place to describe how to support it. > > > > > > > > > > > > In this FLIP design, the speculative execution of Committer of Sink > V2 > > > will > > > > be disabled by Flink. It's not an optional operation. Users can not
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, Apologies for the late jumping in. My only question is about SinkFunction, does this imply that we want to add support for this to the SinkFunction? If so, I would not be in favour of that since we would like to deprecate (I actually thought that was already the case) the SinkFunction in favour of SinkV2. Besides that, I have no other comments. Best regards, Martijn On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang wrote: > Hi Biao, > > Thanks for explanation. > > +1 for the proposal. > > Best, > Jing Zhang > > Lijie Wang 于2023年1月4日周三 12:11写道: > > > Hi Biao, > > > > Thanks for the explanation of how SinkV2 knows the right subtask > > attempt. I have no more questions, +1 for the proposal. > > > > Best, > > Lijie > > > > Biao Liu 于2022年12月28日周三 17:22写道: > > > > > Thanks for all your feedback! > > > > > > To @Yuxia, > > > > > > > What the sink expect to do to isolate data produced by speculative > > > > executions? IIUC, if the taks failover, it also generate a new > > attempt. > > > > Does it make difference in isolating data produced? > > > > > > > > > Yes there is something different from the task failover scenario. The > > > attempt number is more necessary for speculative execution than > failover. > > > Because there can be only one subtask instance running at the same time > > in > > > the failover scenario. > > > > > > Let's take FileSystemOutputFormat as an example. For the failover > > scenario, > > > the temporary directory to store produced data can be something like > > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask > > deletes > > > and re-creates the temporary directory. > > > > > > However in the speculative execution scenario, it does not work because > > > there might be several subtasks running at the same time. These > subtasks > > > might delete, re-create and write the same temporary directory at the > > > same time. The correct temporary directory should be like > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary > to > > > expose the attempt number to the Sink implementation to do the data > > > isolation. > > > > > > > > > To @Lijie, > > > > > > > I have a question about this: does SinkV2 need to do the same thing? > > > > > > > > > Actually, yes. > > > > > > Should we/users do it in the committer? If yes, how does the commiter > > know > > > > which one is the right subtask attempt? > > > > > > > > > Yes, we/users should do it in the committer. > > > > > > In the current design, the Committer of Sink V2 should get the "which > one > > > is the right subtask attempt" information from the "committable data'' > > > produced by SinkWriter. Let's take the FileSink as example, the > > > "committable data" sent to the Committer contains the full path of the > > > files produced by SinkWriter. Users could also pass the attempt number > > > through "committable data" from SinkWriter to Committer. > > > > > > In the "Rejected Alternatives -> Introduce a way to clean leaked data > of > > > Sink V2" section of the FLIP document, we discussed some of the reasons > > > that we didn't provide the API like OutputFormat. > > > > > > To @Jing Zhang > > > > > > I have a question about this: Speculative execution of Committer will > be > > > > disabled. > > > > > > I agree with your point and I saw the similar requirements to disable > > > speculative > > > > execution for specified operators. > > > > > > However the requirement is not supported currently. I think there > > > should be some > > > > place to describe how to support it. > > > > > > > > > In this FLIP design, the speculative execution of Committer of Sink V2 > > will > > > be disabled by Flink. It's not an optional operation. Users can not > > change > > > it. > > > And as you said, "disable speculative execution for specified > operators" > > is > > > not supported in the FLIP. Because it's a bit out of scope: "Sink > > Supports > > > Speculative Execution For Batch Job". I think it's better to start > > another > > > FLIP to discuss it. "Fine-grained control of enabling speculative > > execution > > > for operators" can be the title of that FLIP. And we can discuss there > > how > > > to enable or disable speculative execution for specified operators > > > including Committer and pre/post-committer of Sink V2. > > > > > > What do you think? > > > > > > Thanks, > > > Biao /'bɪ.aʊ/ > > > > > > > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang wrote: > > > > > > > Hi Biao, > > > > > > > > Thanks for driving this FLIP. It's meaningful to support speculative > > > > execution > > > > of sinks is important. > > > > > > > > I have a question about this: Speculative execution of Committer will > > be > > > > disabled. > > > > > > > > I agree with your point and I saw the similar requirements to disable > > > > speculative execution for specified operators. > > > > > > > > However the requirement is not supported currently. I think there > > should > > > be > > > > some place to
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, Thanks for explanation. +1 for the proposal. Best, Jing Zhang Lijie Wang 于2023年1月4日周三 12:11写道: > Hi Biao, > > Thanks for the explanation of how SinkV2 knows the right subtask > attempt. I have no more questions, +1 for the proposal. > > Best, > Lijie > > Biao Liu 于2022年12月28日周三 17:22写道: > > > Thanks for all your feedback! > > > > To @Yuxia, > > > > > What the sink expect to do to isolate data produced by speculative > > > executions? IIUC, if the taks failover, it also generate a new > attempt. > > > Does it make difference in isolating data produced? > > > > > > Yes there is something different from the task failover scenario. The > > attempt number is more necessary for speculative execution than failover. > > Because there can be only one subtask instance running at the same time > in > > the failover scenario. > > > > Let's take FileSystemOutputFormat as an example. For the failover > scenario, > > the temporary directory to store produced data can be something like > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask > deletes > > and re-creates the temporary directory. > > > > However in the speculative execution scenario, it does not work because > > there might be several subtasks running at the same time. These subtasks > > might delete, re-create and write the same temporary directory at the > > same time. The correct temporary directory should be like > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to > > expose the attempt number to the Sink implementation to do the data > > isolation. > > > > > > To @Lijie, > > > > > I have a question about this: does SinkV2 need to do the same thing? > > > > > > Actually, yes. > > > > Should we/users do it in the committer? If yes, how does the commiter > know > > > which one is the right subtask attempt? > > > > > > Yes, we/users should do it in the committer. > > > > In the current design, the Committer of Sink V2 should get the "which one > > is the right subtask attempt" information from the "committable data'' > > produced by SinkWriter. Let's take the FileSink as example, the > > "committable data" sent to the Committer contains the full path of the > > files produced by SinkWriter. Users could also pass the attempt number > > through "committable data" from SinkWriter to Committer. > > > > In the "Rejected Alternatives -> Introduce a way to clean leaked data of > > Sink V2" section of the FLIP document, we discussed some of the reasons > > that we didn't provide the API like OutputFormat. > > > > To @Jing Zhang > > > > I have a question about this: Speculative execution of Committer will be > > > disabled. > > > > I agree with your point and I saw the similar requirements to disable > > speculative > > > execution for specified operators. > > > > However the requirement is not supported currently. I think there > > should be some > > > place to describe how to support it. > > > > > > In this FLIP design, the speculative execution of Committer of Sink V2 > will > > be disabled by Flink. It's not an optional operation. Users can not > change > > it. > > And as you said, "disable speculative execution for specified operators" > is > > not supported in the FLIP. Because it's a bit out of scope: "Sink > Supports > > Speculative Execution For Batch Job". I think it's better to start > another > > FLIP to discuss it. "Fine-grained control of enabling speculative > execution > > for operators" can be the title of that FLIP. And we can discuss there > how > > to enable or disable speculative execution for specified operators > > including Committer and pre/post-committer of Sink V2. > > > > What do you think? > > > > Thanks, > > Biao /'bɪ.aʊ/ > > > > > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang wrote: > > > > > Hi Biao, > > > > > > Thanks for driving this FLIP. It's meaningful to support speculative > > > execution > > > of sinks is important. > > > > > > I have a question about this: Speculative execution of Committer will > be > > > disabled. > > > > > > I agree with your point and I saw the similar requirements to disable > > > speculative execution for specified operators. > > > > > > However the requirement is not supported currently. I think there > should > > be > > > some place to describe how to support it. > > > > > > Best, > > > Jing Zhang > > > > > > Lijie Wang 于2022年12月27日周二 18:51写道: > > > > > > > Hi Biao, > > > > > > > > Thanks for driving this FLIP. > > > > In this FLIP, it introduces "int getFinishedAttempt(int > subtaskIndex)" > > > for > > > > OutputFormat to know which subtask attempt is the one marked as > > finished > > > by > > > > JM and commit the right data. > > > > I have a question about this: does SinkV2 need to do the same thing? > > > Should > > > > we/users do it in the committer? If yes, how does the commiter know > > which > > > > one is the right subtask attempt? > > > > > > > > Best, > > > > Lijie > > > > > > > > yuxia 于2022年12月27日周二 10:01写道: > > > > > > >
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, Thanks for the explanation of how SinkV2 knows the right subtask attempt. I have no more questions, +1 for the proposal. Best, Lijie Biao Liu 于2022年12月28日周三 17:22写道: > Thanks for all your feedback! > > To @Yuxia, > > > What the sink expect to do to isolate data produced by speculative > > executions? IIUC, if the taks failover, it also generate a new attempt. > > Does it make difference in isolating data produced? > > > Yes there is something different from the task failover scenario. The > attempt number is more necessary for speculative execution than failover. > Because there can be only one subtask instance running at the same time in > the failover scenario. > > Let's take FileSystemOutputFormat as an example. For the failover scenario, > the temporary directory to store produced data can be something like > "$root_dir/task-$taskNumber/". At the initialization phase, subtask deletes > and re-creates the temporary directory. > > However in the speculative execution scenario, it does not work because > there might be several subtasks running at the same time. These subtasks > might delete, re-create and write the same temporary directory at the > same time. The correct temporary directory should be like > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to > expose the attempt number to the Sink implementation to do the data > isolation. > > > To @Lijie, > > > I have a question about this: does SinkV2 need to do the same thing? > > > Actually, yes. > > Should we/users do it in the committer? If yes, how does the commiter know > > which one is the right subtask attempt? > > > Yes, we/users should do it in the committer. > > In the current design, the Committer of Sink V2 should get the "which one > is the right subtask attempt" information from the "committable data'' > produced by SinkWriter. Let's take the FileSink as example, the > "committable data" sent to the Committer contains the full path of the > files produced by SinkWriter. Users could also pass the attempt number > through "committable data" from SinkWriter to Committer. > > In the "Rejected Alternatives -> Introduce a way to clean leaked data of > Sink V2" section of the FLIP document, we discussed some of the reasons > that we didn't provide the API like OutputFormat. > > To @Jing Zhang > > I have a question about this: Speculative execution of Committer will be > > disabled. > > I agree with your point and I saw the similar requirements to disable > speculative > > execution for specified operators. > > However the requirement is not supported currently. I think there > should be some > > place to describe how to support it. > > > In this FLIP design, the speculative execution of Committer of Sink V2 will > be disabled by Flink. It's not an optional operation. Users can not change > it. > And as you said, "disable speculative execution for specified operators" is > not supported in the FLIP. Because it's a bit out of scope: "Sink Supports > Speculative Execution For Batch Job". I think it's better to start another > FLIP to discuss it. "Fine-grained control of enabling speculative execution > for operators" can be the title of that FLIP. And we can discuss there how > to enable or disable speculative execution for specified operators > including Committer and pre/post-committer of Sink V2. > > What do you think? > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang wrote: > > > Hi Biao, > > > > Thanks for driving this FLIP. It's meaningful to support speculative > > execution > > of sinks is important. > > > > I have a question about this: Speculative execution of Committer will be > > disabled. > > > > I agree with your point and I saw the similar requirements to disable > > speculative execution for specified operators. > > > > However the requirement is not supported currently. I think there should > be > > some place to describe how to support it. > > > > Best, > > Jing Zhang > > > > Lijie Wang 于2022年12月27日周二 18:51写道: > > > > > Hi Biao, > > > > > > Thanks for driving this FLIP. > > > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)" > > for > > > OutputFormat to know which subtask attempt is the one marked as > finished > > by > > > JM and commit the right data. > > > I have a question about this: does SinkV2 need to do the same thing? > > Should > > > we/users do it in the committer? If yes, how does the commiter know > which > > > one is the right subtask attempt? > > > > > > Best, > > > Lijie > > > > > > yuxia 于2022年12月27日周二 10:01写道: > > > > > > > HI, Biao. > > > > Thanks for driving this FLIP. > > > > After quick look of this FLIP, I have a question about "expose the > > > attempt > > > > number which can be used to isolate data produced by speculative > > > executions > > > > with the same subtask id". > > > > What the sink expect to do to isolate data produced by speculative > > > > executions? IIUC, if the taks failover, it also generate a
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Thanks for all your feedback! To @Yuxia, > What the sink expect to do to isolate data produced by speculative > executions? IIUC, if the taks failover, it also generate a new attempt. > Does it make difference in isolating data produced? Yes there is something different from the task failover scenario. The attempt number is more necessary for speculative execution than failover. Because there can be only one subtask instance running at the same time in the failover scenario. Let's take FileSystemOutputFormat as an example. For the failover scenario, the temporary directory to store produced data can be something like "$root_dir/task-$taskNumber/". At the initialization phase, subtask deletes and re-creates the temporary directory. However in the speculative execution scenario, it does not work because there might be several subtasks running at the same time. These subtasks might delete, re-create and write the same temporary directory at the same time. The correct temporary directory should be like "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to expose the attempt number to the Sink implementation to do the data isolation. To @Lijie, > I have a question about this: does SinkV2 need to do the same thing? Actually, yes. Should we/users do it in the committer? If yes, how does the commiter know > which one is the right subtask attempt? Yes, we/users should do it in the committer. In the current design, the Committer of Sink V2 should get the "which one is the right subtask attempt" information from the "committable data'' produced by SinkWriter. Let's take the FileSink as example, the "committable data" sent to the Committer contains the full path of the files produced by SinkWriter. Users could also pass the attempt number through "committable data" from SinkWriter to Committer. In the "Rejected Alternatives -> Introduce a way to clean leaked data of Sink V2" section of the FLIP document, we discussed some of the reasons that we didn't provide the API like OutputFormat. To @Jing Zhang I have a question about this: Speculative execution of Committer will be > disabled. I agree with your point and I saw the similar requirements to disable speculative > execution for specified operators. However the requirement is not supported currently. I think there should be some > place to describe how to support it. In this FLIP design, the speculative execution of Committer of Sink V2 will be disabled by Flink. It's not an optional operation. Users can not change it. And as you said, "disable speculative execution for specified operators" is not supported in the FLIP. Because it's a bit out of scope: "Sink Supports Speculative Execution For Batch Job". I think it's better to start another FLIP to discuss it. "Fine-grained control of enabling speculative execution for operators" can be the title of that FLIP. And we can discuss there how to enable or disable speculative execution for specified operators including Committer and pre/post-committer of Sink V2. What do you think? Thanks, Biao /'bɪ.aʊ/ On Wed, 28 Dec 2022 at 11:30, Jing Zhang wrote: > Hi Biao, > > Thanks for driving this FLIP. It's meaningful to support speculative > execution > of sinks is important. > > I have a question about this: Speculative execution of Committer will be > disabled. > > I agree with your point and I saw the similar requirements to disable > speculative execution for specified operators. > > However the requirement is not supported currently. I think there should be > some place to describe how to support it. > > Best, > Jing Zhang > > Lijie Wang 于2022年12月27日周二 18:51写道: > > > Hi Biao, > > > > Thanks for driving this FLIP. > > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)" > for > > OutputFormat to know which subtask attempt is the one marked as finished > by > > JM and commit the right data. > > I have a question about this: does SinkV2 need to do the same thing? > Should > > we/users do it in the committer? If yes, how does the commiter know which > > one is the right subtask attempt? > > > > Best, > > Lijie > > > > yuxia 于2022年12月27日周二 10:01写道: > > > > > HI, Biao. > > > Thanks for driving this FLIP. > > > After quick look of this FLIP, I have a question about "expose the > > attempt > > > number which can be used to isolate data produced by speculative > > executions > > > with the same subtask id". > > > What the sink expect to do to isolate data produced by speculative > > > executions? IIUC, if the taks failover, it also generate a new > attempt. > > > Does it make difference in isolating data produced? > > > > > > Best regards, > > > Yuxia > > > > > > - 原始邮件 - > > > 发件人: "Biao Liu" > > > 收件人: "dev" > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21 > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch > Job > > > > > > Hi everyone, > > > > > > I would like to start a discussion about making Sink support > speculative > > >
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, Thanks for driving this FLIP. It's meaningful to support speculative execution of sinks is important. I have a question about this: Speculative execution of Committer will be disabled. I agree with your point and I saw the similar requirements to disable speculative execution for specified operators. However the requirement is not supported currently. I think there should be some place to describe how to support it. Best, Jing Zhang Lijie Wang 于2022年12月27日周二 18:51写道: > Hi Biao, > > Thanks for driving this FLIP. > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)" for > OutputFormat to know which subtask attempt is the one marked as finished by > JM and commit the right data. > I have a question about this: does SinkV2 need to do the same thing? Should > we/users do it in the committer? If yes, how does the commiter know which > one is the right subtask attempt? > > Best, > Lijie > > yuxia 于2022年12月27日周二 10:01写道: > > > HI, Biao. > > Thanks for driving this FLIP. > > After quick look of this FLIP, I have a question about "expose the > attempt > > number which can be used to isolate data produced by speculative > executions > > with the same subtask id". > > What the sink expect to do to isolate data produced by speculative > > executions? IIUC, if the taks failover, it also generate a new attempt. > > Does it make difference in isolating data produced? > > > > Best regards, > > Yuxia > > > > - 原始邮件 - > > 发件人: "Biao Liu" > > 收件人: "dev" > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21 > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job > > > > Hi everyone, > > > > I would like to start a discussion about making Sink support speculative > > execution for batch jobs. This proposal is a follow up of "FLIP-168: > > Speculative Execution For Batch Job"[1]. Speculative execution is very > > meaningful for batch jobs. And it would be more complete after supporting > > speculative execution of Sink. Please find more details in the FLIP > > document > > [2]. > > > > Looking forward to your feedback. > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job > > > > Thanks, > > Biao /'bɪ.aʊ/ > > >
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, Thanks for driving this FLIP. In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)" for OutputFormat to know which subtask attempt is the one marked as finished by JM and commit the right data. I have a question about this: does SinkV2 need to do the same thing? Should we/users do it in the committer? If yes, how does the commiter know which one is the right subtask attempt? Best, Lijie yuxia 于2022年12月27日周二 10:01写道: > HI, Biao. > Thanks for driving this FLIP. > After quick look of this FLIP, I have a question about "expose the attempt > number which can be used to isolate data produced by speculative executions > with the same subtask id". > What the sink expect to do to isolate data produced by speculative > executions? IIUC, if the taks failover, it also generate a new attempt. > Does it make difference in isolating data produced? > > Best regards, > Yuxia > > - 原始邮件 - > 发件人: "Biao Liu" > 收件人: "dev" > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21 > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job > > Hi everyone, > > I would like to start a discussion about making Sink support speculative > execution for batch jobs. This proposal is a follow up of "FLIP-168: > Speculative Execution For Batch Job"[1]. Speculative execution is very > meaningful for batch jobs. And it would be more complete after supporting > speculative execution of Sink. Please find more details in the FLIP > document > [2]. > > Looking forward to your feedback. > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job > > Thanks, > Biao /'bɪ.aʊ/ >
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
HI, Biao. Thanks for driving this FLIP. After quick look of this FLIP, I have a question about "expose the attempt number which can be used to isolate data produced by speculative executions with the same subtask id". What the sink expect to do to isolate data produced by speculative executions? IIUC, if the taks failover, it also generate a new attempt. Does it make difference in isolating data produced? Best regards, Yuxia - 原始邮件 - 发件人: "Biao Liu" 收件人: "dev" 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job Hi everyone, I would like to start a discussion about making Sink support speculative execution for batch jobs. This proposal is a follow up of "FLIP-168: Speculative Execution For Batch Job"[1]. Speculative execution is very meaningful for batch jobs. And it would be more complete after supporting speculative execution of Sink. Please find more details in the FLIP document [2]. Looking forward to your feedback. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job Thanks, Biao /'bɪ.aʊ/
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Hi Biao, Thanks for creating this FLIP! Supporting speculative execution of sinks is important. Also In production we see sinks chaining with other operators, e.g. sources in simple ETL jobs, and currently the task cannot do speculative execution due to sinks are not supported. +1 for the proposal. Thanks, Zhu Biao Liu 于2022年12月22日周四 20:16写道: > > Hi everyone, > > I would like to start a discussion about making Sink support speculative > execution for batch jobs. This proposal is a follow up of "FLIP-168: > Speculative Execution For Batch Job"[1]. Speculative execution is very > meaningful for batch jobs. And it would be more complete after supporting > speculative execution of Sink. Please find more details in the FLIP document > [2]. > > Looking forward to your feedback. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job > > Thanks, > Biao /'bɪ.aʊ/