Hi Jacob, one of the contracts Flink has is that if a UDF throws an exception then this means that it has failed and that it needs recovery. Hence, it is the responsibility of the user to make sure that tolerable exceptions do not bubble up. If you have dirty input data then it might make sense to put a sanitization operator directly after the sources which filters out invalid data so that downstream operators can assume that the data is correct.
For the question about Map you can either convert it to a FlatMap operation which can output arbitrarily many elements (also zero) or you introduce something like an Optional type which can represent a null value and a non-null value. This is something you can do in the user code. I hope this helps a bit. Cheers, Till On Tue, Apr 27, 2021 at 7:30 PM Jacob Sevart <[email protected]> wrote: > How do we get uncaught exceptions in operators to skip the problematic > messages, rather than crash the entire job? Is there an easier or less > mistake-prone way to do this than wrapping every operator method in > try/catch? > > And what to do about Map? Since it has to return something, we're either > returning null and then catching it with a *.filter(Objects.nonNull)* in > the next operator, or converting it to FlatMap. FlatMap conversion is > annoying, because then we need to mock the Collector for testing. > > Obviously it would be best to sanitize inputs so that exceptions don't > occur, but we've recently encountered some setbacks in the game of > whack-a-mole with pathological messages, and are hoping to mitigate the > losses when these do occur. > > Jacob >
