Hi Panagiotis, Thank you for the update. Looks great! Just one suggestion below:
1. We seem to be waiting for the future(s) to complete before restarting the job - should we add a configurable timeout for the enrichment? Since each failure enricher are run in parallel, we could probably settle for 1 timeout for all failure handlers. 2. +1 to Zhu’s comment on adding a comma separated list of FailureHandlers instead of boolean toggle! Other than the above, the FLIP looks great! Thank you for your efforts. Regards, Hong > On 11 Apr 2023, at 08:01, Zhu Zhu <reed...@gmail.com> wrote: > > CAUTION: This email originated from outside of the organization. Do not click > links or open attachments unless you can confirm the sender and know the > content is safe. > > > > Hi Panagiotis, > > Thanks for updating the FLIP. > >> Regarding the config option `jobmanager.failure-enricher-plugins.enabled` > I think a config option `jobmanager.failure-enrichers`, which accepts > the names of enrichers to use, may be better. It allows the users to > deploy and use the plugins in a more flexible way. The default value > of the config can be none, which means failure enrichment will be > disabled by default. > A reference can be the config option `metrics.reporters` which helps > to load metric reporter plugins. > > Thanks, > Zhu > > Panagiotis Garefalakis <pga...@apache.org> 于2023年4月10日周一 03:47写道: >> >> Hello again everyone, >> >> FLIP is now updated based on our discussion! >> In short, FLIP-304 [1] proposes the addition of a pluggable interface that >> will allow users to add custom logic and enrich failures with custom >> metadata labels. >> While as discussed, custom restart strategies will be part of a different >> effort. Every pluggable FaulireEnricher: >> >> - Is triggered on every global/non-global failure >> - Receives a Throwable cause and an immutable Context >> - Performs asynchronous execution (separate IoExecutor) to avoid >> blocking the main thread for RPCs >> - Is completely independent from other Enrichers >> - Emits failure labels/tags for its unique, pre-defined keys (defined at >> startup time) >> >> >> Check the link for implementation details and please let me know what you >> think :) >> >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers >> >> >> Panagiotis >> >> >> On Tue, Mar 28, 2023 at 5:01 AM Zhu Zhu <reed...@gmail.com> wrote: >> >>> Hi Panagiotis, >>> >>> How about to introduce a config option to control which error handling >>> plugins should be used? It is more flexible for deployments. Additionally, >>> it can also enable users to explicitly specify the order that the plugins >>> take effects. >>> >>> Thanks, >>> Zhu >>> >>> Gen Luo <luogen...@gmail.com> 于2023年3月27日周一 15:02写道: >>>> >>>> Thanks for the summary! >>>> >>>> Also +1 to support custom restart strategies in a different FLIP, >>>> as long as we can make sure that the plugin interface won't be >>>> changed when the restart strategy interface is introduced. >>>> >>>> To achieve this, maybe we should think well how the handler >>>> would cooperate with the restart strategy, like would it executes b >>>> efore the strategy (e.g. some strategy may use the tag), or after >>>> it (e.g. some metric reporting handler may use the handling result). >>>> Though we can implement in one way, and extend if the other is >>>> really necessary by someone. >>>> >>>> Besides, instead of using either of the names, shall we just make >>>> them two subclasses named FailureEnricher and FailureListener? >>>> The former executes synchronously and can modify the context, >>>> while the latter executes asynchronously and has a read-only view >>>> of context. In this way we can make sure a handler behaves in >>>> the expected way. >>>> >>>> >>>> On Thu, Mar 23, 2023 at 5:19 PM Zhu Zhu <reed...@gmail.com> wrote: >>>> >>>>> +1 to support custom restart strategies in a different FLIP. >>>>> >>>>> It's fine to have a different plugin for custom restart strategy. >>>>> If so, since we do not treat the FLIP-304 plugin as a common failure >>>>> handler, but instead mainly targets to add labels to errors, I would >>>>> +1 for the name `FailureEnricher`. >>>>> >>>>> Thanks, >>>>> Zhu >>>>> >>>>> David Morávek <d...@apache.org> 于2023年3月23日周四 15:51写道: >>>>>> >>>>>>> >>>>>>> One additional remark on introducing it as an async operation: We >>> would >>>>>>> need a new configuration parameter to define the timeout for such a >>>>>>> listener call, wouldn't we? >>>>>>> >>>>>> >>>>>> This could be left up to the implementor to handle. >>>>>> >>>>>> What about adding an extra method getNamespace() to the Listener >>>>> interface >>>>>>> which returns an Optional<String>. >>>>>>> >>>>>> >>>>>> I'd avoid mixing an additional concept into this. We can simply have >>> a >>>>> new >>>>>> method that returns a set of keys the listener can output. We can >>>>> validate >>>>>> this at the JM startup time and fail fast (since it's a configuration >>>>>> error) if there is an overlap. If the listener outputs the key that >>> is >>>>> not >>>>>> allowed to, I wouldn't be afraid to call into a fatal error handler >>> since >>>>>> it's an invalid implementation. >>>>>> >>>>>> Best, >>>>>> D. >>>>>> >>>>>> On Thu, Mar 23, 2023 at 8:34 AM Matthias Pohl >>>>>> <matthias.p...@aiven.io.invalid> wrote: >>>>>> >>>>>>> Sounds good. Two points I want to add: >>>>>>> >>>>>>> - Listener execution should be independent — however we need a >>> way >>>>> to >>>>>>>> enforce a Label key/key-prefix is only assigned to a single >>> Listener, >>>>>>>> thinking of a validation step both at Listener init and runtime >>>>> stages >>>>>>>> >>>>>>> What about adding an extra method getNamespace() to the Listener >>>>> interface >>>>>>> which returns an Optional<String>. Therefore, the >>> implementation/the >>>>> user >>>>>>> can decide depending on the use case whether it's necessary to have >>>>>>> separate namespaces for the key/value pairs or not. On the Flink >>> side, >>>>> we >>>>>>> would just merge the different maps considering their namespaces. >>>>>>> >>>>>>> A flaw of this approach is that if a user decides to use the same >>>>> namespace >>>>>>> for multiple listeners, how is an error in one of the listeners >>>>> represented >>>>>>> in the outcome? We would have to overwrite either the successful >>>>> listener's >>>>>>> result or the failed ones. I wanted to share it, anyway. >>>>>>> >>>>>>> One additional remark on introducing it as an async operation: We >>> would >>>>>>> need a new configuration parameter to define the timeout for such a >>>>>>> listener call, wouldn't we? >>>>>>> >>>>>>> Matthias >>>>>>> >>>>>>> On Wed, Mar 22, 2023 at 4:56 PM Panagiotis Garefalakis < >>>>> pga...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi everyone, >>>>>>>> >>>>>>>> >>>>>>>> Thanks for the valuable comments! >>>>>>>> Excited to see this is an area of interest for the community! >>>>>>>> >>>>>>>> Summarizing some of the main points raised along with my >>> thoughts: >>>>>>>> >>>>>>>> - Labels (Key/Value) pairs are more expressive than Tags >>>>> (Strings) so >>>>>>>> using the former is a good idea — I am also debating if we >>> want to >>>>>>>> return >>>>>>>> multiple KV pairs per Listener (one could argue that we could >>>>> split >>>>>>> the >>>>>>>> logic in multiple Listeners to support that) >>>>>>>> - An immutable context along with data returned using the >>>>> interface >>>>>>>> method implementations is a better approach than a mutable >>>>> Collection >>>>>>>> - Listener execution should be independent — however we need a >>>>> way to >>>>>>>> enforce a Label key/key-prefix is only assigned to a single >>>>> Listener, >>>>>>>> thinking of a validation step both at Listener init and >>> runtime >>>>> stages >>>>>>>> - We want to perform async Listener operations as sync could >>>>> block the >>>>>>>> main thread — exposing an ioExecutor pool through the context >>>>> could be >>>>>>>> an >>>>>>>> elegant solution here >>>>>>>> - Make sure Listener errors are not failing jobs — make sure >>> to >>>>> log >>>>>>> and >>>>>>>> keep the job alive >>>>>>>> - We need better naming / public interface >>> separation/description >>>>>>>> >>>>>>>> - Even though custom restart strategies share some >>>>> properties >>>>>>> with >>>>>>>> Listeners, they would probably need a separate interface with a >>>>> different >>>>>>>> return type anyway (restart strategy not labels) and in general >>> they >>>>> are >>>>>>>> different and complex enough to justify their own FLIP (that can >>>>> also be >>>>>>> a >>>>>>>> follow-up). >>>>>>>> >>>>>>>> >>>>>>>> What do people think? I am planning to modify the FLIP to reflect >>>>> these >>>>>>>> changes if they make sense to everyone. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Panagiotis >>>>>>>> >>>>>>>> On Wed, Mar 22, 2023 at 6:28 AM Hong Teoh <hlteo...@gmail.com> >>>>> wrote: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> Thank you Panagiotis for proposing this. From the size of the >>>>> thread, >>>>>>>> this >>>>>>>>> is a much needed feature in Flink! >>>>>>>>> Some thoughts, to extend those already adeptly summarised by >>> Piotr, >>>>>>>>> Matthias and Jing. >>>>>>>>> >>>>>>>>> - scope of FLIP: +1 to scoping this FLIP to observability >>> around a >>>>>>>>> restart. That would include adding metadata + exposing >>> metadata to >>>>>>>> external >>>>>>>>> systems. IMO, introducing a new restart strategy solves >>> different >>>>>>>> problems, >>>>>>>>> is much larger scope and should be covered in a separate FLIP. >>>>>>>>> >>>>>>>>> - failure handling: At the moment, we propose transitioning the >>>>> Flink >>>>>>> job >>>>>>>>> to a terminal FAILED state when JobListener fails, when the job >>>>> could >>>>>>>> have >>>>>>>>> transitioned to RESTARTING->RUNNING. If we are keeping in line >>>>> with the >>>>>>>>> scope to add metadata/observability around job restarts, we >>> should >>>>> not >>>>>>> be >>>>>>>>> affecting the running of the Flink job itself. Could I propose >>> we >>>>>>> instead >>>>>>>>> log WARN/ERROR. >>>>>>>>> >>>>>>>>> - immutable context: +1 to keeping the contract clear via >>> return >>>>> types. >>>>>>>>> - async operation: +1 to adding ioexecutor to context, however, >>>>> given >>>>>>> we >>>>>>>>> don’t want to block the actual job restart on adding metadata / >>>>> calling >>>>>>>>> external services, should we consider returning and letting >>> futures >>>>>>>>> complete independently? >>>>>>>>> >>>>>>>>> - independent vs ordered execution: Should we consider making >>> the >>>>> order >>>>>>>> of >>>>>>>>> execution deterministic (use a List instead of Set)? >>>>>>>>> >>>>>>>>> >>>>>>>>> Once again, thank you for working on this. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Hong >>>>>>>>> >>>>>>>>> >>>>>>>>>> On 21 Mar 2023, at 21:07, Jing Ge <j...@ververica.com.INVALID >>>> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Thanks Panagiotis for this FLIP and thanks for all valuable >>>>>>>> discussions. >>>>>>>>>> I'd like to share my two cents: >>>>>>>>>> >>>>>>>>>> - FailureListenerContext#addTag and >>>>> FailureListenerContext#getTags. >>>>>>> It >>>>>>>>>> seems that we have to call getTags() and then do remove >>>>> activities if >>>>>>>> we >>>>>>>>>> want to delete any tags (according to the javadoc in the >>> FLIP). >>>>> It >>>>>>> is >>>>>>>>>> inconsistent for me too. Either offer addTag(), deleteTag(), >>> and >>>>> let >>>>>>>>>> getTags() return immutable collection, or offer getTags() >>> only to >>>>>>>> return >>>>>>>>>> mutable collection. >>>>>>>>>> >>>>>>>>>> - label vs tag. Label is a great idea +1. AFAIC, tag could >>> be a >>>>>>> special >>>>>>>>>> case of label, i.e. key="tag". It is convenient to offer the >>>>> xxxTag() >>>>>>>>>> method if the user only needs one label. I would love to have >>>>> both of >>>>>>>>> them. >>>>>>>>>> Another thought is that tag implicitly contains the meaning >>> of >>>>>>>>> "immutable". >>>>>>>>>> >>>>>>>>>> - +1 for a separate FLIP of customized restart strategy. >>>>> Attention >>>>>>>> should >>>>>>>>>> be taken to make sure it works well with Flink built-in >>>>>>> restartStrategy >>>>>>>>> in >>>>>>>>>> order to have the single source of truth. >>>>>>>>>> >>>>>>>>>> - execution order. The default independent execution should >>> be >>>>> fine. >>>>>>>>>> According to the FailureListener interface definition in the >>>>> FLIP, >>>>>>>> users >>>>>>>>>> should be able to easily build a listener chain[1] to offer >>>>>>> sequential >>>>>>>>>> execution, e.g. public FailureListener(FailureListener >>>>> nextListener). >>>>>>>>>> Another option is to modify the interface or provide another >>>>>>> interface >>>>>>>>>> alongside the current one to extend the method to support >>>>>>>> ListenerChain, >>>>>>>>>> i.e. void onFailure(Throwable cause, FailureListenerContext >>>>> context, >>>>>>>>>> ListenerChain listenerChain). Users can also mix them up. >>>>>>>>>> >>>>>>>>>> - naming. Afaiu, the pluggable extension is not limited to >>>>> failure >>>>>>>>>> enrichment. Conceptually it can do everything for the given >>>>> failure, >>>>>>>> e.g. >>>>>>>>>> start counting metric as the FLIP described, calling an >>> external >>>>>>>> system, >>>>>>>>>> sending notification to slack channel, etc. you name it. It >>>>> sounds to >>>>>>>> me >>>>>>>>>> more like a FailureActionListener - it can trigger actions >>> based >>>>> on >>>>>>>>>> failure. Failure enrichment is one type of action. >>>>>>>>>> >>>>>>>>>> Best regards, >>>>>>>>>> Jing >>>>>>>>>> >>>>>>>>>> [1] >>>>> https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern >>>>>>>>>> >>>>>>>>>> On Tue, Mar 21, 2023 at 3:39 PM Matthias Pohl >>>>>>>>>> <matthias.p...@aiven.io.invalid> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for the proposal, Panagiotis. A lot of good points >>> have >>>>> been >>>>>>>>> already >>>>>>>>>>> shared. I just want to add my view on some of the items: >>>>>>>>>>> >>>>>>>>>>> - independent execution vs ordered execution: I prefer the >>>>> listeners >>>>>>>>> being >>>>>>>>>>> processed independently from each other because it adds less >>>>>>>> complexity >>>>>>>>>>> code-wise. The use case Piotr described (where you want to >>> reuse >>>>>>> some >>>>>>>>> other >>>>>>>>>>> classifier) is the only one I can think of where we actually >>>>> need >>>>>>>>>>> classifiers depending on each other. Supporting such a use >>> case >>>>>>> right >>>>>>>>> from >>>>>>>>>>> the start feels a bit over-engineered and could be covered >>> in a >>>>>>>>> follow-up >>>>>>>>>>> FLIP if we really come to that point where such a feature is >>>>>>> requested >>>>>>>>> by >>>>>>>>>>> users. >>>>>>>>>>> >>>>>>>>>>> - key/value pairs instead of plain labels: I think that's a >>> good >>>>>>> idea. >>>>>>>>>>> key/value pairs are more expressive. +1 >>>>>>>>>>> >>>>>>>>>>> - extending the FLIP to cover restart strategy: I understand >>>>> Gen's >>>>>>>>> concern >>>>>>>>>>> about introducing too many different types of plugins. But I >>>>> would >>>>>>>> still >>>>>>>>>>> favor not extending the FLIP in this regard. A pluggable >>> restart >>>>>>>>> strategy >>>>>>>>>>> sounds reasonable. But an error classifier and a restart >>>>> strategy >>>>>>> are >>>>>>>>> still >>>>>>>>>>> different enough to justify separate plugins, IMHO. And >>>>> therefore, I >>>>>>>>> would >>>>>>>>>>> think that covering the restart strategy in a separate FLIP >>> is >>>>> the >>>>>>>>> better >>>>>>>>>>> option for the sake of simplicity. >>>>>>>>>>> >>>>>>>>>>> - immutable context: Passing in an immutable context and >>>>> returning >>>>>>>> data >>>>>>>>>>> through the interface method's return value sounds like a >>> better >>>>>>>>> approach >>>>>>>>>>> to harden the contract of the interface. +1 for that >>> proposal >>>>>>>>>>> >>>>>>>>>>> - async operation: I think David is right. An async >>> interface >>>>> makes >>>>>>>> the >>>>>>>>>>> listener implementations more robust when it comes to heavy >>> IO >>>>>>>>> operations. >>>>>>>>>>> The ioExecutor can be passed through the context object. +1 >>>>>>>>>>> >>>>>>>>>>> Matthias >>>>>>>>>>> >>>>>>>>>>> On Tue, Mar 21, 2023 at 2:09 PM David Morávek < >>>>>>>> david.mora...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> *@Piotr* >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> I was thinking about actually defining the order of the >>>>>>>>>>>>> classifiers/handlers and not allowing them to be >>> asynchronous. >>>>>>>>>>>>> Asynchronousity would create some problems: when to >>> actually >>>>>>> return >>>>>>>>> the >>>>>>>>>>>>> error to the user? After all async responses will get >>> back? >>>>>>> Before, >>>>>>>>> but >>>>>>>>>>>>> without classified exception? It would also add >>> implementation >>>>>>>>>>> complexity >>>>>>>>>>>>> and I think we can always expand the API with async >>> version >>>>> in the >>>>>>>>>>> future >>>>>>>>>>>>> if needed. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> As long as the classifiers need to talk to an external >>> system, >>>>> we >>>>>>> by >>>>>>>>>>>> definition need to allow them to be asynchronous to >>> unblock the >>>>>>> main >>>>>>>>>>> thread >>>>>>>>>>>> for handling other RPCs. Exposing ioExecutor via the >>> context >>>>>>> proposed >>>>>>>>>>> above >>>>>>>>>>>> would be great. >>>>>>>>>>>> >>>>>>>>>>>> After all async responses will get back >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> This would be the same if we trigger them synchronously >>> one by >>>>> one, >>>>>>>>> with >>>>>>>>>>> a >>>>>>>>>>>> caveat that synchronous execution might take significantly >>>>> longer >>>>>>> and >>>>>>>>>>>> introduce unnecessary downtime to a job. >>>>>>>>>>>> >>>>>>>>>>>> D. >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Mar 21, 2023 at 1:12 PM Zhu Zhu <reed...@gmail.com >>>> >>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Piotr, >>>>>>>>>>>>> >>>>>>>>>>>>> It's fine to me to have a separate FLIP to extend this >>>>>>>>>>> `FailureListener` >>>>>>>>>>>>> to support custom restart strategy. >>>>>>>>>>>>> >>>>>>>>>>>>> What I was a bit concerned is that if we just treat the >>>>>>>>>>> `FailureListener` >>>>>>>>>>>>> as an error classifier which is not crucial to Flink >>> framework >>>>>>>>> process, >>>>>>>>>>>>> we may design it to run asynchronously and not trigger >>> Flink >>>>>>>> failures. >>>>>>>>>>>>> This may be a blocker if later we want to enable it to >>> support >>>>>>>> custom >>>>>>>>>>>>> restart strategy. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Zhu >>>>>>>>>>>>> >>>>>>>>>>>>> Dian Fu <dian0511...@gmail.com> 于2023年3月21日周二 19:53写道: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Panagiotis, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the proposal. This is a very valuable feature >>> and >>>>> will >>>>>>>> be >>>>>>>>>>> a >>>>>>>>>>>>> good >>>>>>>>>>>>>> add-on for Flink. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I also think that it will be great if we can consider >>> how to >>>>> make >>>>>>>> it >>>>>>>>>>>>>> possible for users to customize the failure handling in >>> this >>>>>>> FLIP. >>>>>>>>>>> It's >>>>>>>>>>>>>> highly related to the problem we want to address in this >>>>> FLIP and >>>>>>>>>>> could >>>>>>>>>>>>>> avoid refactoring the interfaces proposed in this FLIP >>> too >>>>>>> quickly. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Currently it treats all kinds of exceptions the same. >>>>> However, >>>>>>> some >>>>>>>>>>>> kinds >>>>>>>>>>>>>> of exceptions are actually not recoverable at all. It >>> could >>>>> let >>>>>>>> users >>>>>>>>>>>> to >>>>>>>>>>>>>> customize the failure handling logic to fail fast for >>> certain >>>>>>> known >>>>>>>>>>>>>> unrecoverable exceptions and finally make these kinds of >>>>> jobs get >>>>>>>>>>>> noticed >>>>>>>>>>>>>> and recoveried more quickly. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> Dian >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Mar 21, 2023 at 4:36 PM Gen Luo < >>> luogen...@gmail.com >>>>>> >>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Panagiotis, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the proposal. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It's useful to enrich the information so that users can >>> be >>>>> more >>>>>>>>>>>>>>> clear why the job is failing, especially platform >>>>> developers who >>>>>>>>>>>>>>> need to provide the information to their end users. >>>>>>>>>>>>>>> And for the very FLIP, I'd prefer the naming >>>>> `FailureEnricher` >>>>>>>>>>>>>>> proposed by David, as the plugin doesn't really handle >>> the >>>>>>>> failure. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> However, like Zhu and Lijie said, I also joined a >>> discussion >>>>>>>>>>>>>>> recently about customized failure handling, e.g. >>> counting >>>>> the >>>>>>>>>>>>>>> failure rate of pipeline regions separately, and failing >>>>> the job >>>>>>>>>>>>>>> when a specific error occurs, and so on. >>>>>>>>>>>>>>> I suppose a custom restart strategy, or I'd call it a >>> custom >>>>>>>>>>>>>>> failure "handler", is indeed necessary. It can also >>> enrich >>>>> the >>>>>>>>>>>>>>> information as the current proposed handler does. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> To avoid adding too many plugin interfaces which may >>> confuse >>>>>>> users >>>>>>>>>>>>>>> and make the ExecutionFailureHandler more complex, >>>>>>>>>>>>>>> I think it'd be better to consider the requirements at >>> the >>>>> same >>>>>>>>>>> time. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> IMO, we can add a handler interface, then make the >>> current >>>>>>> restart >>>>>>>>>>>>>>> strategy and the enricher both types of the handler. The >>>>>>> handlers >>>>>>>>>>>>>>> execute in sequence, and the failure is considered >>>>> unrecoverable >>>>>>>> if >>>>>>>>>>>>>>> any of the handlers decides. >>>>>>>>>>>>>>> In this way, users can also implement a handler using >>> the >>>>>>> enriched >>>>>>>>>>>>>>> information provided by the previous handlers, e.g. fail >>>>> the job >>>>>>>>>>> and >>>>>>>>>>>>>>> send a notification if too many failures are caused by >>> the >>>>> end >>>>>>>>>>> users. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Gen >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Mar 21, 2023 at 11:38 AM Weihua Hu < >>>>>>>> huweihua....@gmail.com >>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Panagiotis, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your proposal. It is valuable to analyze the >>>>> reason >>>>>>>>>>> for >>>>>>>>>>>>>>>> failure with the user plug-in. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Making the context immutable could make the contract >>>>> stronger. >>>>>>>>>>>>>>>> Letting the listener return an enriching result may be >>> a >>>>> better >>>>>>>>>>>> way. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> IIUC, listeners could do two things, enrich more >>>>> information >>>>>>>>>>>>>>> (tags/labels) >>>>>>>>>>>>>>>> to FailureHandlingResult, and push data out of Flink >>>>> (metrics >>>>>>> or >>>>>>>>>>>>>>>> something). >>>>>>>>>>>>>>>> IMO, we could split these two types into Listener and >>>>> Advisor >>>>>>>>>>>> (maybe >>>>>>>>>>>>>>>> other names). The Listener just pushes the data out and >>>>> returns >>>>>>>>>>>>> nothing >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> Flink, so we can run these async and don't have to >>> wait for >>>>>>>>>>>>> Listener's >>>>>>>>>>>>>>>> result. >>>>>>>>>>>>>>>> The Advisor returns rich information to the >>>>>>> FailureHadingResult, >>>>>>>>>>>>> and it >>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>> have a lighter logic. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Supporting a custom restart strategy is also valuable. >>> In >>>>> this >>>>>>>>>>>>> design, we >>>>>>>>>>>>>>>> use >>>>>>>>>>>>>>>> RestartStrategy to construct a FailureHandingResult, >>> and >>>>> then >>>>>>>>>>> pass >>>>>>>>>>>>> it to >>>>>>>>>>>>>>>> Listener. >>>>>>>>>>>>>>>> My question is, should we change the restart strategy >>>>> interface >>>>>>>>>>> to >>>>>>>>>>>>>>> support >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> custom restart strategy, or keep the current restart >>>>> strategy >>>>>>> and >>>>>>>>>>>>> let the >>>>>>>>>>>>>>>> later >>>>>>>>>>>>>>>> Listener enrich the restartable information to >>>>>>>>>>>> FailureHandingResult? >>>>>>>>>>>>> The >>>>>>>>>>>>>>>> latter >>>>>>>>>>>>>>>> may cause some confusion when we use a custom restart >>>>> strategy. >>>>>>>>>>>>>>>> The default flink restart strategy also runs but does >>> not >>>>> take >>>>>>>>>>>>> effect. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Weihua >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Mar 20, 2023 at 11:42 PM Lijie Wang < >>>>>>>>>>>>> wangdachui9...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Panagiotis, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for driving this. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> +1 for supporting custom restart strategy, we did >>> receive >>>>> such >>>>>>>>>>>>> requests >>>>>>>>>>>>>>>>> from the user mailing list [1][2]. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Besides, in current design, the plugin will only do >>> some >>>>>>>>>>>>> statistical >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> classification work, and will not affect the >>>>>>>>>>>>> *FailureHandlingResult*. >>>>>>>>>>>>>>>> Just >>>>>>>>>>>>>>>>> listening, no handling, it doesn't quite match the >>> title. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>> >>>>> https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1 >>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>> >>>>> https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> Lijie >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年3月20日周一 21:39写道: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Panagiotis, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for creating this proposal! It's good to >>> enable >>>>> Flink >>>>>>>>>>> to >>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>>> different errors in different ways, through a >>> pluggable >>>>> way. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> There are requests for flexible restart strategies >>> from >>>>> time >>>>>>>>>>> to >>>>>>>>>>>>> time, >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>> different strategies of restart backoff time, or to >>>>> suppress >>>>>>>>>>>>>>> restarting >>>>>>>>>>>>>>>>>> on certain errors. Therefore, I think it's better >>> that >>>>> the >>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>> failure handling plugin can also support custom >>> restart >>>>>>>>>>>>> strategies. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Maybe we can call it FailureHandlingAdvisor which >>>>> provides >>>>>>>>>>> more >>>>>>>>>>>>>>>>>> information (labels) and gives advice (restart >>> backoff >>>>> time, >>>>>>>>>>>>> whether >>>>>>>>>>>>>>>>>> to restart)? I do not have a strong opinion though, >>> any >>>>>>>>>>>>> explanatory >>>>>>>>>>>>>>>>>> name would be good. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> To avoid unexpected mutation, how about to make the >>>>> context >>>>>>>>>>>>> immutable >>>>>>>>>>>>>>>>>> and let the plugin return an immutable result? i.e. >>>>> remove >>>>>>>>>>> the >>>>>>>>>>>>>>> setters >>>>>>>>>>>>>>>>>> from the context, and let the plugin method return a >>>>> result >>>>>>>>>>>> which >>>>>>>>>>>>>>>>>> contains `labels`, `canRestart` and >>> `restartBackoffTime`. >>>>>>>>>>> Flink >>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>> apply the result to the context before invoking the >>> next >>>>>>>>>>>> plugin, >>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>> that the next plugin will see the updated context. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The plugin should avoid taking too much time to >>> return >>>>> the >>>>>>>>>>>>> result, >>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>> it will block the RPC and result in instability. >>>>> However, it >>>>>>>>>>>> can >>>>>>>>>>>>>>> still >>>>>>>>>>>>>>>>>> perform heavy actions in a different thread. The >>> context >>>>> can >>>>>>>>>>>>> provide >>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>> `ioExecutor` to the plugins for reuse. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Zhu >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Shammon FY <zjur...@gmail.com> 于2023年3月20日周一 >>> 20:21写道: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Panagiotis >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thank you for your answer. I agree that >>>>> `FailureListener` >>>>>>>>>>>>> could be >>>>>>>>>>>>>>>>>>> stateless, then I have some thoughts as follows >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. I see that listeners and tag collections are >>>>> associated. >>>>>>>>>>>>> When >>>>>>>>>>>>>>>>>> JobManager >>>>>>>>>>>>>>>>>>> fails and restarts, how can the new listener be >>>>> associated >>>>>>>>>>>>> with the >>>>>>>>>>>>>>>> tag >>>>>>>>>>>>>>>>>>> collection before failover? Is the listener loading >>>>> order? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2. The tag collection may be too large, resulting >>> in the >>>>>>>>>>>>> JobManager >>>>>>>>>>>>>>>>> OOM, >>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>> we need to provide a management class that supports >>> some >>>>>>>>>>>>>>> obsolescence >>>>>>>>>>>>>>>>>>> strategies instead of a direct Collection? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 3. Is it possible to provide a more complex data >>>>> structure >>>>>>>>>>>>> than a >>>>>>>>>>>>>>>>> simple >>>>>>>>>>>>>>>>>>> string collection for tags in listeners, such as >>>>> key-value? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>> Shammon FY >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu < >>>>>>>>>>>> xbjt...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi,Panagiotis >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thank you for kicking off this discussion. >>> Overall, the >>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>> feature of >>>>>>>>>>>>>>>>>>>> this FLIP makes sense to me. We have also discussed >>>>>>>>>>> similar >>>>>>>>>>>>>>>>>> requirements >>>>>>>>>>>>>>>>>>>> with our users and developers, and I believe it >>> will >>>>> help >>>>>>>>>>>>> many >>>>>>>>>>>>>>>> users. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In terms of FLIP content, I have some thoughts: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> (1) For the FailureListenerContextget interface, >>> the >>>>>>>>>>>> methods >>>>>>>>>>>>>>>>>>>> FailureListenerContext#addTag and >>>>>>>>>>>>> FailureListenerContextgetTags >>>>>>>>>>>>>>>> looks >>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>> inconsistent because they imply specific >>> implementation >>>>>>>>>>>>> details, >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> all FailureListeners need to handle them, we >>> shouldn't >>>>>>>>>>> put >>>>>>>>>>>>> them >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> interface. Minor: The comment "UDF loading" in the >>>>>>>>>>>>>>>>> getUserClassLoader() >>>>>>>>>>>>>>>>>>>> method looks like a typo, IIUC it should return the >>>>>>>>>>>>> classloader >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> current job. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> (2) Regarding the implementation in >>>>>>>>>>>>>>>>>> ExecutionFailureHandler#handleFailure, >>>>>>>>>>>>>>>>>>>> some custom listeners may have heavy IO operations, >>>>> such >>>>>>>>>>> as >>>>>>>>>>>>>>>> reporting >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> their monitoring system. The current logic appears >>> to >>>>> be >>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> JobMaster's main thread, and it is recommended not >>> to >>>>> do >>>>>>>>>>>> this >>>>>>>>>>>>>>> kind >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> processing in the main thread. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> (3) The results of FailureListener's processing >>> and the >>>>>>>>>>>>>>>>>>>> FailureHandlingResult returned by >>>>> ExecutionFailureHandler >>>>>>>>>>>>> are not >>>>>>>>>>>>>>>>>> related. >>>>>>>>>>>>>>>>>>>> I think these two are closely related, the >>> motivation >>>>> of >>>>>>>>>>>> this >>>>>>>>>>>>>>> FLIP >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> make current failure handling more flexible. From >>> this >>>>>>>>>>>>>>> perspective, >>>>>>>>>>>>>>>>>>>> different listeners should have the opportunity to >>>>> affect >>>>>>>>>>>> the >>>>>>>>>>>>>>> job's >>>>>>>>>>>>>>>>>> failure >>>>>>>>>>>>>>>>>>>> handling flow. For example, a Flink job is >>> configured >>>>>>>>>>> with >>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> RestartStrategy with huge numbers retry , but the >>> Kafka >>>>>>>>>>>>> topic of >>>>>>>>>>>>>>>>>> Source has >>>>>>>>>>>>>>>>>>>> been deleted, the job will failover continuously. >>> In >>>>> this >>>>>>>>>>>>> case, >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>> should have their listener to determine whether >>> this >>>>>>>>>>>> failure >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> recoverable >>>>>>>>>>>>>>>>>>>> or unrecoverable, and then wrap the processing >>> result >>>>>>>>>>> into >>>>>>>>>>>>>>>>>>>> FailureHandlingResult.unrecoverable(xx) and pass >>> it to >>>>>>>>>>>>> JobMaster, >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>> approach will be more flexible. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> (4) All FLIPs have an important section named >>> Public >>>>>>>>>>>>> Interfaces. >>>>>>>>>>>>>>>>>> Current >>>>>>>>>>>>>>>>>>>> FLIP mixes the interface section and the >>> implementation >>>>>>>>>>>>> section >>>>>>>>>>>>>>>>>> together. >>>>>>>>>>>>>>>>>>>> It is better for us to refer to the FLIP >>> template[1] >>>>> and >>>>>>>>>>>>> separate >>>>>>>>>>>>>>>>> them, >>>>>>>>>>>>>>>>>>>> this will make the entire FLIP clearer. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In addition, regarding the FLIP process, there is a >>>>> small >>>>>>>>>>>>>>>> suggestion: >>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>> community generally creates a JIRA issue after the >>> FLIP >>>>>>>>>>>> vote >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> passed, >>>>>>>>>>>>>>>>>>>> instead of during the FLIP preparation phase >>> because >>>>> the >>>>>>>>>>>>> FLIP may >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> rejected. Although this FLIP is very reasonable, >>> it's >>>>>>>>>>>> better >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> follow >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> process. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Leonard >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>> >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:04 PM David Morávek < >>>>>>>>>>>>> d...@apache.org> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> however listeners can use previous state >>>>>>>>>>> (tags/labels) >>>>>>>>>>>> to >>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>> decisions >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> That sounds like a very fragile contract. We >>> should >>>>>>>>>>>> either >>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>> passing >>>>>>>>>>>>>>>>>>>>> tags between listeners and then need to define >>>>> ordering >>>>>>>>>>>> or >>>>>>>>>>>>> make >>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> them >>>>>>>>>>>>>>>>>>>>> independent. I prefer the latter because it >>> allows us >>>>>>>>>>> to >>>>>>>>>>>>>>>>> parallelize >>>>>>>>>>>>>>>>>>>> things >>>>>>>>>>>>>>>>>>>>> if needed (if all listeners trigger an RCP to the >>>>>>>>>>>> external >>>>>>>>>>>>>>>> system, >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> example). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Can you expand on why we need more than one >>> classifier >>>>>>>>>>> to >>>>>>>>>>>>> be >>>>>>>>>>>>>>> able >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> output >>>>>>>>>>>>>>>>>>>>> the same tag? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> system ones come first and then the ones loaded >>> from >>>>>>>>>>> the >>>>>>>>>>>>> plugin >>>>>>>>>>>>>>>>>> manager >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Since they're returned as a Set, the order is >>>>>>>>>>> completely >>>>>>>>>>>>>>>>>>>> non-deterministic, >>>>>>>>>>>>>>>>>>>>> no matter in which order they're loaded. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> just communicating with external >>> monitoring/alerting >>>>>>>>>>>>> systems >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> That makes the need for pushing things out of the >>> main >>>>>>>>>>>>> thread >>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>> stronger. This almost sounds like we need to >>> return a >>>>>>>>>>>>>>>>>> CompletableFuture >>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> the per-throwable classification because an >>> external >>>>>>>>>>>> system >>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>> take a >>>>>>>>>>>>>>>>>>>>> significant time to respond. We need to unblock >>> the >>>>>>>>>>> main >>>>>>>>>>>>> thread >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>>> RPCs. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Also, in the proposal, this happens in the failure >>>>>>>>>>>>> handler. If >>>>>>>>>>>>>>>>>> that's the >>>>>>>>>>>>>>>>>>>>> case, this might block the job from being >>> restarted >>>>> (if >>>>>>>>>>>> the >>>>>>>>>>>>>>>> restart >>>>>>>>>>>>>>>>>>>>> strategy allows for another restart), which would >>> be >>>>>>>>>>>> great >>>>>>>>>>>>> to >>>>>>>>>>>>>>>> avoid >>>>>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>>> it can introduce extra downtime. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This raises another question: what should happen >>> if >>>>> the >>>>>>>>>>>>>>>>>> classification >>>>>>>>>>>>>>>>>>>>> fails? Crashing the job (which is what's currently >>>>>>>>>>>>> proposed) >>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>> dangerous if this might depend on an external >>> system. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thats a valid point, passing the JobGraph >>> containing >>>>>>>>>>> all >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> above >>>>>>>>>>>>>>>>>>>>>> information is also something to consider >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> We should avoid passing JG around because it's >>> mutable >>>>>>>>>>>>> (which >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> must fix >>>>>>>>>>>>>>>>>>>>> in the long term), and letting users change it >>> might >>>>>>>>>>> have >>>>>>>>>>>>>>>>>> consequences. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>> D. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Mon, Mar 20, 2023 at 7:23 AM Panagiotis >>> Garefalakis >>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>> pga...@apache.org> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hey David, Shammon, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for the valuable comments! >>>>>>>>>>>>>>>>>>>>>> I am glad you find this proposal useful, some >>>>>>>>>>> thoughts: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> @Shammon >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1. How about adding more job information in >>>>>>>>>>>>>>>>>> FailureListenerContext? For >>>>>>>>>>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager >>>>>>>>>>> location. >>>>>>>>>>>>> And >>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>>>>> more statistics according to different >>> dimensions. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thats a valid point, passing the JobGraph >>> containing >>>>>>>>>>>> all >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> above >>>>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>>> is also something to consider, I was mostly >>> trying to >>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> conservative: >>>>>>>>>>>>>>>>>>>>>> i.e., passingly only the information we need, and >>>>>>>>>>>> extend >>>>>>>>>>>>> as >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>>> fit >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 2. Users may want to save results in listener, >>> and >>>>>>>>>>> then >>>>>>>>>>>>> they >>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> historical results even jabmanager failover. >>> Can we >>>>>>>>>>>>>>> provide a >>>>>>>>>>>>>>>>>> unified >>>>>>>>>>>>>>>>>>>>>>> implementation for data storage requirements? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The idea is to store only the output of the >>> Listeners >>>>>>>>>>>>> (tags) >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> treat >>>>>>>>>>>>>>>>>>>>> them >>>>>>>>>>>>>>>>>>>>>> as stateless. >>>>>>>>>>>>>>>>>>>>>> Tags are be stored along with HistoryEntries, and >>>>>>>>>>> will >>>>>>>>>>>> be >>>>>>>>>>>>>>>>> available >>>>>>>>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>>>>>> the HistoryServer >>>>>>>>>>>>>>>>>>>>>> even after a JM dies. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> @David >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1) Should we also consider adding labels? The >>>>>>>>>>>>> combination of >>>>>>>>>>>>>>>> tags >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>> labels seems to be what most systems offer; >>>>>>>>>>>> sometimes, >>>>>>>>>>>>> they >>>>>>>>>>>>>>>>> offer >>>>>>>>>>>>>>>>>>>>> labels >>>>>>>>>>>>>>>>>>>>>>> only (key=value pairs) because tags can be >>>>>>>>>>>> implemented >>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>> those, >>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>> the other way around. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Indeed changing tags to k:v labels could be more >>>>>>>>>>>>> expressive, >>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>> like it! >>>>>>>>>>>>>>>>>>>>>> Let's see what others think. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 2) Since we can not predict how heavy >>> user-defined >>>>>>>>>>>> models >>>>>>>>>>>>>>>>>> ("listeners") >>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>> going to be, it would be great to keep the >>>>>>>>>>>>> interfaces/data >>>>>>>>>>>>>>>>>> structures >>>>>>>>>>>>>>>>>>>>>>> immutable so we can push things over to the I/O >>>>>>>>>>>>> threads. >>>>>>>>>>>>>>>> Also, >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>> sounds >>>>>>>>>>>>>>>>>>>>>>> off to call the main interface a Listener since >>>>>>>>>>> it's >>>>>>>>>>>>>>> supposed >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> enhance >>>>>>>>>>>>>>>>>>>>>>> the original throwable with additional metadata. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The idea was for the name to be generic as there >>>>>>>>>>> could >>>>>>>>>>>> be >>>>>>>>>>>>>>>>> Listener >>>>>>>>>>>>>>>>>>>>>> implementations >>>>>>>>>>>>>>>>>>>>>> just communicating with external >>> monitoring/alerting >>>>>>>>>>>>> systems >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> no >>>>>>>>>>>>>>>>>>>>>> metadata output >>>>>>>>>>>>>>>>>>>>>> -- but lets rethink that. For immutability, see >>>>>>>>>>> below: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 3) You're proposing to support a set of >>> listeners. >>>>>>>>>>>> Since >>>>>>>>>>>>>>> you're >>>>>>>>>>>>>>>>>> passing >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> mutable context around, which includes tags set >>> by >>>>>>>>>>>> the >>>>>>>>>>>>>>>> previous >>>>>>>>>>>>>>>>>>>>> listener, >>>>>>>>>>>>>>>>>>>>>>> do you expect users to make any assumptions >>> about >>>>>>>>>>> the >>>>>>>>>>>>> order >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>> listeners are executed? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In the existing proposal we are not making any >>>>>>>>>>>>> assumptions >>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> order >>>>>>>>>>>>>>>>>>>>>> of listeners, >>>>>>>>>>>>>>>>>>>>>> (system ones come first and then the ones loaded >>> from >>>>>>>>>>>> the >>>>>>>>>>>>>>>> plugin >>>>>>>>>>>>>>>>>>>> manager) >>>>>>>>>>>>>>>>>>>>>> however listeners can use previous state >>>>>>>>>>> (tags/labels) >>>>>>>>>>>> to >>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>> decisions: >>>>>>>>>>>>>>>>>>>>>> e.g., wont assign *UNKNOWN* failureType when we >>> have >>>>>>>>>>>>> already >>>>>>>>>>>>>>>> seen >>>>>>>>>>>>>>>>>> *USER >>>>>>>>>>>>>>>>>>>>> *or >>>>>>>>>>>>>>>>>>>>>> the other way around -- when we have seen >>> *UNKNOWN* >>>>>>>>>>>>> remove in >>>>>>>>>>>>>>>>>> favor of >>>>>>>>>>>>>>>>>>>>>> *USER* >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>> Panagiotis >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sun, Mar 19, 2023 at 10:42 AM David Morávek < >>>>>>>>>>>>>>>> d...@apache.org> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Panagiotis, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> This is an excellent proposal and something >>>>>>>>>>> everyone >>>>>>>>>>>>> trying >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>>>>>>>>>> "Flink as a service" needs to solve at some >>> point. >>>>>>>>>>> I >>>>>>>>>>>>> have a >>>>>>>>>>>>>>>>>> couple of >>>>>>>>>>>>>>>>>>>>>>> questions: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> If I understand the proposal correctly, this is >>>>>>>>>>> just >>>>>>>>>>>>> about >>>>>>>>>>>>>>>>> adding >>>>>>>>>>>>>>>>>>>> tags >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> the Throwable by running a tuple of (Throwable, >>>>>>>>>>>>>>>> FailureContext) >>>>>>>>>>>>>>>>>>>>> through a >>>>>>>>>>>>>>>>>>>>>>> user-defined model. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 1) Should we also consider adding labels? The >>>>>>>>>>>>> combination >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> tags and >>>>>>>>>>>>>>>>>>>>>>> labels seems to be what most systems offer; >>>>>>>>>>>> sometimes, >>>>>>>>>>>>> they >>>>>>>>>>>>>>>>> offer >>>>>>>>>>>>>>>>>>>>> labels >>>>>>>>>>>>>>>>>>>>>>> only (key=value pairs) because tags can be >>>>>>>>>>>> implemented >>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>> those, >>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>> the other way around. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 2) Since we can not predict how heavy >>> user-defined >>>>>>>>>>>>> models >>>>>>>>>>>>>>>>>>>> ("listeners") >>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>> going to be, it would be great to keep the >>>>>>>>>>>>> interfaces/data >>>>>>>>>>>>>>>>>> structures >>>>>>>>>>>>>>>>>>>>>>> immutable so we can push things over to the I/O >>>>>>>>>>>>> threads. >>>>>>>>>>>>>>>> Also, >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>> sounds >>>>>>>>>>>>>>>>>>>>>>> off to call the main interface a Listener since >>>>>>>>>>> it's >>>>>>>>>>>>>>> supposed >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> enhance >>>>>>>>>>>>>>>>>>>>>>> the original throwable with additional metadata. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I'd propose something along the lines of (we >>> should >>>>>>>>>>>>> have >>>>>>>>>>>>>>>> better >>>>>>>>>>>>>>>>>>>> names, >>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> is just to outline the idea): >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> interface FailureEnricher { >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ThrowableWithTagsAndLabels >>>>>>>>>>> enrichFailure(Throwable >>>>>>>>>>>>> cause, >>>>>>>>>>>>>>>>>>>>>>> ImmutableContextualMetadataAboutTheThrowable >>>>>>>>>>>> context); >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> The names should change; this is just to outline >>>>>>>>>>> the >>>>>>>>>>>>> idea. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 3) You're proposing to support a set of >>> listeners. >>>>>>>>>>>>> Since >>>>>>>>>>>>>>>> you're >>>>>>>>>>>>>>>>>>>> passing >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> mutable context around, which includes tags set >>> by >>>>>>>>>>>> the >>>>>>>>>>>>>>>> previous >>>>>>>>>>>>>>>>>>>>> listener, >>>>>>>>>>>>>>>>>>>>>>> do you expect users to make any assumptions >>> about >>>>>>>>>>> the >>>>>>>>>>>>> order >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>> listeners are executed? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> *@Shammon* >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Users may want to save results in listener, and >>>>>>>>>>> then >>>>>>>>>>>>> they >>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>> get the >>>>>>>>>>>>>>>>>>>>>>>> historical results even jabmanager failover. >>> Can >>>>>>>>>>> we >>>>>>>>>>>>>>>> provide a >>>>>>>>>>>>>>>>>>>> unified >>>>>>>>>>>>>>>>>>>>>>>> implementation for data storage requirements? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I think we should explicitly state that all >>>>>>>>>>>>> "listeners" are >>>>>>>>>>>>>>>>>> treated >>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>> stateless. I don't see any strong reason for >>>>>>>>>>>>> snapshotting >>>>>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>> D. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 18, 2023 at 1:00 AM Shammon FY < >>>>>>>>>>>>>>>> zjur...@gmail.com> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Panagiotis >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thank you for starting this discussion. I think >>>>>>>>>>>> this >>>>>>>>>>>>> FLIP >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> valuable >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>> can help user to analyze the causes of job >>>>>>>>>>> failover >>>>>>>>>>>>>>> better! >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have two comments as follows >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 1. How about adding more job information in >>>>>>>>>>>>>>>>>> FailureListenerContext? >>>>>>>>>>>>>>>>>>>>> For >>>>>>>>>>>>>>>>>>>>>>>> example, job vertext, subtask, taskmanager >>>>>>>>>>>> location. >>>>>>>>>>>>> And >>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>>>>>> more statistics according to different >>>>>>>>>>> dimensions. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 2. Users may want to save results in listener, >>>>>>>>>>> and >>>>>>>>>>>>> then >>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> historical results even jabmanager failover. >>> Can >>>>>>>>>>> we >>>>>>>>>>>>>>>> provide a >>>>>>>>>>>>>>>>>>>> unified >>>>>>>>>>>>>>>>>>>>>>>> implementation for data storage requirements? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>> shammon FY >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Saturday, March 18, 2023, Panagiotis >>>>>>>>>>>> Garefalakis < >>>>>>>>>>>>>>>>>>>>> pga...@apache.org >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> This FLIP [1] proposes a pluggable interface >>>>>>>>>>> for >>>>>>>>>>>>>>> failure >>>>>>>>>>>>>>>>>> handling >>>>>>>>>>>>>>>>>>>>>>>> allowing >>>>>>>>>>>>>>>>>>>>>>>>> users to implement custom failure logic using >>>>>>>>>>> the >>>>>>>>>>>>>>> plugin >>>>>>>>>>>>>>>>>>>> framework. >>>>>>>>>>>>>>>>>>>>>>>>> Motivated by existing proposals [2] and >>> tickets >>>>>>>>>>>>> [3], >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>> enables >>>>>>>>>>>>>>>>>>>>>>>> use-cases >>>>>>>>>>>>>>>>>>>>>>>>> like: assigning particular types to failures >>>>>>>>>>>> (e.g., >>>>>>>>>>>>>>> User >>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>> System), >>>>>>>>>>>>>>>>>>>>>>>>> emitting custom metrics per type (e.g., >>>>>>>>>>>>> application or >>>>>>>>>>>>>>>>>> platform), >>>>>>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>>>>>> exposing errors to downstream consumers (e.g., >>>>>>>>>>>>>>>> notification >>>>>>>>>>>>>>>>>>>>> systems). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks to Piotr and Anton for the initial >>>>>>>>>>> reviews >>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> discussions! >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> For anyone interested, the starting point >>> would >>>>>>>>>>>> be >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> FLIP >>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>> created, >>>>>>>>>>>>>>>>>>>>>>>>> describing the motivation and the proposed >>>>>>>>>>>> changes >>>>>>>>>>>>>>> (part >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> core, >>>>>>>>>>>>>>>>>>>>>>>>> runtime and web). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> The intuition behind this FLIP is being able >>> to >>>>>>>>>>>>> execute >>>>>>>>>>>>>>>>>> custom >>>>>>>>>>>>>>>>>>>>> logic >>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>> failures by exposing a FailureListener >>>>>>>>>>> interface. >>>>>>>>>>>>>>>>>> Implementation >>>>>>>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>>>>>> can be simply loaded to the system as Jar >>>>>>>>>>> files. >>>>>>>>>>>>>>>>>> FailureListeners >>>>>>>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>>>>>>>> decide to assign failure tags to errors >>>>>>>>>>>> (expressed >>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>> strings), >>>>>>>>>>>>>>>>>>>>>>>>> that will then be exposed as metadata by the >>>>>>>>>>>>> UI/Rest >>>>>>>>>>>>>>>>>> interfaces. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Feedback is always appreciated! Looking >>> forward >>>>>>>>>>>> to >>>>>>>>>>>>> your >>>>>>>>>>>>>>>>>> thoughts! >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304% >>>>>>>>>>>>>>>>>>>>>>>>> 3A+Pluggable+failure+handling+for+Apache+Flink >>>>>>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>> https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67 >>>>>>>>>>>>>>>>>>>>>>>>> Hmjgy0-hRDeuFnrMgT4 >>>>>>>>>>>>>>>>>>>>>>>>> [3] >>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-20833 >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>> Panagiotis >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>>> >>>