Hi,

I’m not sure if I fully understand what do you mean by

> The point is the sink are not predefined.

You must know before submitting the job, what sinks are going to be used in the 
job. You can have some custom logic, that would filter out records before 
writing them to the sinks, as I proposed before, or you could use side outputs 
[1] would be better suited to your use case? 

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>

> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com> 
> wrote:
> 
> Thanks Piotr for the Reply. 
> 
> I will explain my requirement in detail. 
> 
> Table Updates -> Generate Business Events -> Subscribers 
> 
> Source Side
> There are CDC of 100 tables which the framework needs to listen to. 
> 
> Event Table Mapping
> 
> There would be Event associated with table in a m:n fashion. 
> 
> say there are tables TA, TB, TC. 
> 
> EA, EA2 and EA3 are generated from TA (based on conditions)
> EB generated from TB (based on conditions)
> EC generated from TC (no conditions.)
> 
> Say there are events EA,EB,EC generated from the tables TA, TB, TC 
> 
> Event Sink Mapping
> 
> EA has following sinks. kafka topic SA,SA2,SAC. 
> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
> EC has only rest endpoint RC. 
> 
> The point is the sink are not predefined. [. But i only see the example 
> online where , flink code having explicit myStream.addSink(sink2);   ]
> 
> We expect around 500 types of events in our platform in another 2 years time. 
> 
> We are looking at writing a generic job for the same , rather than writing 
> one for new case.
> 
> Let me know your thoughts and flink suitability to this requirement.
> 
> Thanks
> Prasanna.
> 
> 
> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi,
> 
> You could easily filter/map/process the streams differently before writing 
> them to the sinks. Building on top of my previous example, this also should 
> work fine:
> 
> 
> DataStream myStream = env.addSource(…).foo().bar() // for custom source, but 
> any ;
> 
> myStream.baz().addSink(sink1);
> myStream.addSink(sink2);
> myStream.qux().quuz().corge().addSink(sink3);
>  
> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that you 
> wish. `foo` and `bar` would be applied once to the stream, before it’s going 
> to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` would 
> be applied to only of the sinks AFTER splitting.
> 
> In your case, it could be:
> 
> myStream.filter(...).addSink(sink1);
> myStream.addSink(sink2);
> myStream.addSink(sink3);
> 
> So sink2 and sink3 would get all of the records, while sink1 only a portion 
> of them.
> 
> Piotrek 
> 
> 
>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com 
>> <mailto:prasannakumarram...@gmail.com>> wrote:
>> 
>> Piotr, 
>> 
>> Thanks for the reply. 
>> 
>> There is one other case, where some events have to be written to multiple 
>> sinks and while other have to be written to just one sink. 
>> 
>> How could i have a common codeflow/DAG for the same ?
>> 
>> I do not want multiple jobs to do the same want to accomplish in a single 
>> job .
>> 
>> Could i add Stream code "myStream.addSink(sink1)" under a conditional 
>> operator such as 'if' to determine . 
>> 
>> But i suppose here the stream works differently compared to normal code 
>> processing.
>> 
>> Prasanna.
>> 
>> 
>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com 
>> <mailto:pi...@ververica.com>> wrote:
>> Hi,
>> 
>> To the best of my knowledge the following pattern should work just fine:
>> 
>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, but 
>> any ;
>> myStream.addSink(sink1);
>> myStream.addSink(sink2);
>> myStream.addSink(sink3);
>> 
>> All of the records from `myStream` would be passed to each of the sinks.
>> 
>> Piotrek
>> 
>> > On 24 May 2020, at 19:34, Prasanna kumar <prasannakumarram...@gmail.com 
>> > <mailto:prasannakumarram...@gmail.com>> wrote:
>> > 
>> > Hi,
>> > 
>> > There is a single source of events for me in my system. 
>> > 
>> > I need to process and send the events to multiple destination/sink at the 
>> > same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>> > 
>> > I am able send to one sink.
>> > 
>> > By adding more sink stream to the source stream could we achieve it . Are 
>> > there any shortcomings.  
>> > 
>> > Please let me know if any one here has successfully implemented one .
>> > 
>> > Thanks,
>> > Prasanna.
>> 
> 

Reply via email to