Re: Handling non-transient exceptions

2022-04-18 Thread Jose Brandao
Hello,

Thank you for your answer. Yes, we are using the DataStream API.

I agree that exceptions are developer’s responsibility but errors can still 
happen and I would like to have a progressive approach in case they happen 
instead of a blocking one.

I will take a look at your suggestion. Wouldn’t it make sense to optionally 
allowing to move into the next message in case of an unpredicted exception 
happens instead of only killing the tasks and wait for a restart? I know that 
in some cases those exceptions might cause irreparable damage to applications 
but it could be configured per exception.


Regards,
José Brandão

From: Guowei Ma 
Date: Friday, 15 April 2022 at 11:04
To: Jose Brandao 
Cc: user@flink.apache.org 
Subject: Re: Handling non-transient exceptions



EXTERNAL SENDER: This email originated from outside our email system. If you 
find this email suspicious please use the Report Phishing button in your 
Outlook to flag this to the Security Operations team.





Hi, Jose

I assume you are using the DataStream API.

In general for any udf's exception in the DataStream job, only the developer of 
the DataStream job knows whether the exception can be tolerated. Because in 
some cases, tolerating exceptions can cause errors in the final result. So you 
still have to handle each udf exception yourself.

However, there are indeed some points that can be optimized:
1. If you do have a lot of DataStream jobs, you can use some Java Lambda tricks 
to simplify these things, which may make the whole process  easier. For 
example, you can define a 
`sideOutputTheElementCausedTheException(processFunctionX, ...other parameters) 
` in this function, once ProcessFunctionX throws any exception you output the 
exception Element to a SideOutput.
2. As for the differences in the types you mentioned, I tend to normalize them 
all into a json or use  avro format.

But I think it is not easy work to replay all the exception elements.  It is 
only easy to do the replay with the source element.

Best,
Guowei


On Fri, Apr 15, 2022 at 12:33 AM Jose Brandao 
mailto:jose.bran...@blip.pt>> wrote:
Hello,

Searching some expertise on exception handling with checkpointing and 
streaming.  Let’s say some bad data flows into your Flink application and 
causes an exception you are not expecting. That exception will bubble up, 
ending up in killing the respective task and the app will not be able to 
progress. Eventually the topology will restart (if configured so) from the 
previous successful checkpoint/savepoint and will hit that broken message 
again, resulting in a loop.

If we don’t know how to process a given message we would like our topology to 
progress and sink that message into some sort of dead-letter kafka topic.

We have seen some recommendation on using Side 
Outputs<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/side_output/>
 for that but it looks like things can easily get messy with that. We would 
need to extend all our operators with try-catch blocks and side output messages 
within the catch. Then we would need to aggregate all those side outputs and 
decide what to do with them. If we want to output exactly the inbound message 
that originated the exception it requires some extra logic as well since our 
operators have different output types. On top of that it looks like the type of 
operators which allow side outputs is 
limited.https://stackoverflow.com/questions/52411705/flink-whats-the-best-way-to-handle-exceptions-inside-flink-jobs

Wondering if there is a better way to do it? We would like to avoid our 
topology to be stuck because one message originates some unpredicted exception 
and we would also like to have as well the possibility to replay it once we put 
a fix in place, hence the dead letter topic idea.

Regards,
José Brandão





Handling non-transient exceptions

2022-04-14 Thread Jose Brandao
Hello,

Searching some expertise on exception handling with checkpointing and 
streaming.  Let’s say some bad data flows into your Flink application and 
causes an exception you are not expecting. That exception will bubble up, 
ending up in killing the respective task and the app will not be able to 
progress. Eventually the topology will restart (if configured so) from the 
previous successful checkpoint/savepoint and will hit that broken message 
again, resulting in a loop.

If we don’t know how to process a given message we would like our topology to 
progress and sink that message into some sort of dead-letter kafka topic.

We have seen some recommendation on using Side 
Outputs
 for that but it looks like things can easily get messy with that. We would 
need to extend all our operators with try-catch blocks and side output messages 
within the catch. Then we would need to aggregate all those side outputs and 
decide what to do with them. If we want to output exactly the inbound message 
that originated the exception it requires some extra logic as well since our 
operators have different output types. On top of that it looks like the type of 
operators which allow side outputs is 
limited.https://stackoverflow.com/questions/52411705/flink-whats-the-best-way-to-handle-exceptions-inside-flink-jobs

Wondering if there is a better way to do it? We would like to avoid our 
topology to be stuck because one message originates some unpredicted exception 
and we would also like to have as well the possibility to replay it once we put 
a fix in place, hence the dead letter topic idea.

Regards,
José Brandão