I think you basically need something like this:

DataStream input = ...
DataStream withErrors = input.filter(new MyErrorFilter());
DataStream withoutErrors = input.filter(new MyWithoutErrorFilter());

withErrors.addSink(...)
withoutErrors.addSink(...)

Does that help?

On Mon, 20 Feb 2017 at 13:44 Chet Masterson <chet.master...@yandex.com>
wrote:

>
> A while back on the mailing list, there was a discussion on validating a
> stream, and splitting the stream into two sinks, depending on how the
> validation went:
>
> (operator generating errors)
>     --> (filter) --> stream without errors --> sink
>     --> (filter) --> error stream  --> sink
>
> Is there an example of this implemented in (scala) code anywhere? I'm not
> sure how to code this up. Do I embed the error sink in the filter? The
> compiler hated everything I tried.
>

Reply via email to