I think you basically need something like this: DataStream input = ... DataStream withErrors = input.filter(new MyErrorFilter()); DataStream withoutErrors = input.filter(new MyWithoutErrorFilter());
withErrors.addSink(...) withoutErrors.addSink(...) Does that help? On Mon, 20 Feb 2017 at 13:44 Chet Masterson <chet.master...@yandex.com> wrote: > > A while back on the mailing list, there was a discussion on validating a > stream, and splitting the stream into two sinks, depending on how the > validation went: > > (operator generating errors) > --> (filter) --> stream without errors --> sink > --> (filter) --> error stream --> sink > > Is there an example of this implemented in (scala) code anywhere? I'm not > sure how to code this up. Do I embed the error sink in the filter? The > compiler hated everything I tried. >