Hi,

You could easily filter/map/process the streams differently before writing them 
to the sinks. Building on top of my previous example, this also should work 
fine:


DataStream myStream = env.addSource(…).foo().bar() // for custom source, but 
any ;

myStream.baz().addSink(sink1);
myStream.addSink(sink2);
myStream.qux().quuz().corge().addSink(sink3);
 
Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you 
wish. `foo` and `bar` would be applied once to the stream, before it’s going to 
be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would be 
applied to only of the sinks AFTER splitting.

In your case, it could be:

myStream.filter(...).addSink(sink1);
myStream.addSink(sink2);
myStream.addSink(sink3);

So sink2 and sink3 would get all of the records, while sink1 only a portion of 
them.

Piotrek 


> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com> 
> wrote:
> 
> Piotr, 
> 
> Thanks for the reply. 
> 
> There is one other case, where some events have to be written to multiple 
> sinks and while other have to be written to just one sink. 
> 
> How could i have a common codeflow/DAG for the same ?
> 
> I do not want multiple jobs to do the same want to accomplish in a single job 
> .
> 
> Could i add Stream code "myStream.addSink(sink1)" under a conditional 
> operator such as 'if' to determine . 
> 
> But i suppose here the stream works differently compared to normal code 
> processing.
> 
> Prasanna.
> 
> 
> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi,
> 
> To the best of my knowledge the following pattern should work just fine:
> 
> DataStream myStream = env.addSource(…).foo().bar() // for custom source, but 
> any ;
> myStream.addSink(sink1);
> myStream.addSink(sink2);
> myStream.addSink(sink3);
> 
> All of the records from `myStream` would be passed to each of the sinks.
> 
> Piotrek
> 
> > On 24 May 2020, at 19:34, Prasanna kumar <prasannakumarram...@gmail.com 
> > <mailto:prasannakumarram...@gmail.com>> wrote:
> > 
> > Hi,
> > 
> > There is a single source of events for me in my system. 
> > 
> > I need to process and send the events to multiple destination/sink at the 
> > same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
> > 
> > I am able send to one sink.
> > 
> > By adding more sink stream to the source stream could we achieve it . Are 
> > there any shortcomings.  
> > 
> > Please let me know if any one here has successfully implemented one .
> > 
> > Thanks,
> > Prasanna.
> 

Reply via email to