Hi Prasanna, 1.
> The object probably contains or references non serializable fields. That should speak for itself. Flink was not able to distribute your code to the worker nodes. You have used a lambda function that turned out to be non serialisable. You should unit test your code and in this case add a serialisation/deserialisation round trip unit test for the filter function. For starters I would suggest to not use lambda function, but a full/proper named class and work from there. 2. Can not you create an array/map/collection of OutputTags corresponding to the the sinks/topics combinations. One OutputTag per sink(/topic) and use this array/map/collection inside your process function? Piotrek > On 2 Jun 2020, at 13:49, Prasanna kumar <prasannakumarram...@gmail.com> wrote: > > Hi , > > I have a Event router Registry as this. By reading this as input i need to > create a Job which would redirect the messages to the correct sink as per > condition. > { > "eventRouterRegistry": [ > { "eventType": "biling", "outputTopic": "billing" }, > { "eventType": "cost", "outputTopic": "cost" }, > { "eventType": "marketing", "outputTopic": "marketing" } > ] > } > I tried the following two approaches. > 1) Using the Filter method > > public static void setupRouterJobsFilter( > List<eventRouterRegistry> registryList, > StreamExecutionEnvironment env) { > > Properties props = new Properties(); > props.put("bootstrap.servers", BOOTSTRAP_SERVER); > props.put("client.id <http://client.id/>", "flink-example1"); > > FlinkKafkaConsumer011 fkC = > new FlinkKafkaConsumer011<>("EVENTTOPIC", new > SimpleStringSchema(), props); > > DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC"); > > for (eventRouterRegistry record : registryList) { > System.out.print(record.getEventType() + " <==> " + > record.getOutputTopic()); > > FlinkKafkaProducer011 fkp = > new FlinkKafkaProducer011<>(record.getOutputTopic(), new > SimpleStringSchema(), props); > > inputStream.filter(msg -> msg.equals(record.getEventType()) ); > //sideOutputStream.print(); > inputStream.addSink(fkp).name(record.getOutputTopic()); > } > } > Here am getting the following error. > ./bin/flink run -c firstflinkpackage.GenericStreamRouter > ../../myrepository/flink001/target/flink001-1.0.jar > Starting execution of program > --------------------------- > The program finished with the following exception: > > The implementation of the FilterFunction is not serializable. The object > probably contains or references non serializable fields. > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) > > org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) > > org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686) > > firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118) > firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93) > > Looks like I should not use record.getEventType() as this is outside of the > stream. > > Is there a any way to use external variable here mainly to Generalise the > process. > > 2) Using the Side Output method > > Following code is my attempt in creating a generic way for sideoutput > creation. > > I am able to create the Sink Streams based on the list (eventRouterRegistry). > > But i could not generalise the Output tag creation. > The issue here is the output tag is fixed. > My output tag need to be the Event Type and that needs to be in Process > Function too. > > How do i implement. Should I write my own process function ? > public static void setupRouterJobs( > List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) { > > Properties props = new Properties(); > props.put("bootstrap.servers", BOOTSTRAP_SERVER); > props.put("client.id <http://client.id/>", "flink-example1"); > > FlinkKafkaConsumer011 fkC = > new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), > props); > > DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC"); > //Even if i try to generalise OUtput tag here. How do i do it inside > ProcessFunction > final OutputTag<String> outputTag = new OutputTag<String>("side-output") > {}; > SingleOutputStreamOperator<String> mainDataStream = > inputStream.process( > new ProcessFunction<String, String>() { > > @Override > public void processElement(String value, Context ctx, > Collector<String> out) > throws Exception { > // emit data to side output > ctx.output(OutputTag, value); > } > }); > > for (eventRouterRegistry record : registryList) { > System.out.print(record.getEventType() + " <==> " + > record.getOutputTopic()); > > FlinkKafkaProducer011 fkp = > new FlinkKafkaProducer011<>(record.getOutputTopic(), new > SimpleStringSchema(), props); > > DataStream<String> sideOutputStream = > mainDataStream.getSideOutput(outputTag); > sideOutputStream.print(); > sideOutputStream.addSink(fkp).name(record.getOutputTopic()); > } > } > > Thanks, > Prasanna. > > > > On Thu, May 28, 2020 at 8:24 PM Prasanna kumar <prasannakumarram...@gmail.com > <mailto:prasannakumarram...@gmail.com>> wrote: > Alexander, > > Thanks for the reply. Will implement and come back in case of any questions. > > Prasanna. > > On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <alexan...@ververica.com > <mailto:alexan...@ververica.com>> wrote: > Hi Prasanna, > > if the set of all possible sinks is known in advance, side outputs will be > generic enough to express your requirements. Side output produces a stream. > Create all of the side output tags, associate each of them with one sink, add > conditional logic around `ctx.output(outputTag, ... );` to decide where to > dispatch the messages (see [1]), collect to none or many side outputs, > depending on your logic. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html> > -- > Alexander Fedulov | Solutions Architect > > <image.png> > > > > > > > > > > > > > <https://www.ververica.com/> > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > > > On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <prasannakumarram...@gmail.com > <mailto:prasannakumarram...@gmail.com>> wrote: > Piotr, > > There is an event and subscriber registry as JSON file which has the table > event mapping and event-subscriber mapping as mentioned below. > > Based on the set JSON , we need to job to go through the table updates and > create events and for each event there is a way set how to sink them. > > The sink streams have to be added based on this JSON. Thats what i mentioned > as no predefined sink in code earlier. > > You could see that each event has different set of sinks. > > Just checking how much generic could Side-output streams be ?. > > Source -> generate events -> (find out sinks dynamically in code ) -> write > to the respective sinks. > > { > " tablename ": "source.table1", > "events": [ > { > "operation": "update", > "eventstobecreated": [ > { > "eventname": "USERUPDATE", > "Columnoperation": "and", > "ColumnChanges": [ > { > "columnname": "name" > }, > { > "columnname": "loginenabled", > "value": "Y" > } > ], > "Subscribers": [ > { > "customername": "c1", > "method": "Kafka", > "methodparams": { > "topicname": "USERTOPIC" > } > }, > { > "customername": "c2", > "method": "S3", > "methodparams": { > "folder": "aws://folderC2" > }}, ]}] > }, > { > "operation": "insert", > "eventstobecreated": [ > "eventname": "USERINSERT", > "operation": "insert", > "Subscribers": [ > { > "teamname": "General", > "method": "Kafka", > "methodparams": { > "topicname": "new_users" > } > }, > { > "teamname": "General", > "method": "kinesis", > "methodparams": { > "URL": "new_users", > "username": "uname", > "password": "pwd" > }}, ]}] > }, > { > "operation": "delete", > "eventstobecreated": [ > { > "eventname": "USERDELETE", > "Subscribers": [ > { > "customername": "c1", > "method": "Kafka", > "methodparams": { > "topicname": "USERTOPIC" > } > }, > { > "customername": "c4", > "method": "Kafka", > "methodparams": { > "topicname": "deleterecords" > }}, ]}] > }, > } > > Please let me know your thoughts on this. > > Thanks, > Prasanna. > > On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi, > > I’m not sure if I fully understand what do you mean by > > > The point is the sink are not predefined. > > You must know before submitting the job, what sinks are going to be used in > the job. You can have some custom logic, that would filter out records before > writing them to the sinks, as I proposed before, or you could use side > outputs [1] would be better suited to your use case? > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html> > >> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com >> <mailto:prasannakumarram...@gmail.com>> wrote: >> >> Thanks Piotr for the Reply. >> >> I will explain my requirement in detail. >> >> Table Updates -> Generate Business Events -> Subscribers >> >> Source Side >> There are CDC of 100 tables which the framework needs to listen to. >> >> Event Table Mapping >> >> There would be Event associated with table in a m:n fashion. >> >> say there are tables TA, TB, TC. >> >> EA, EA2 and EA3 are generated from TA (based on conditions) >> EB generated from TB (based on conditions) >> EC generated from TC (no conditions.) >> >> Say there are events EA,EB,EC generated from the tables TA, TB, TC >> >> Event Sink Mapping >> >> EA has following sinks. kafka topic SA,SA2,SAC. >> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB. >> EC has only rest endpoint RC. >> >> The point is the sink are not predefined. [. But i only see the example >> online where , flink code having explicit myStream.addSink(sink2); ] >> >> We expect around 500 types of events in our platform in another 2 years >> time. >> >> We are looking at writing a generic job for the same , rather than writing >> one for new case. >> >> Let me know your thoughts and flink suitability to this requirement. >> >> Thanks >> Prasanna. >> >> >> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com >> <mailto:pi...@ververica.com>> wrote: >> Hi, >> >> You could easily filter/map/process the streams differently before writing >> them to the sinks. Building on top of my previous example, this also should >> work fine: >> >> >> DataStream myStream = env.addSource(…).foo().bar() // for custom source, but >> any ; >> >> myStream.baz().addSink(sink1); >> myStream.addSink(sink2); >> myStream.qux().quuz().corge().addSink(sink3); >> >> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that >> you wish. `foo` and `bar` would be applied once to the stream, before it’s >> going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` >> would be applied to only of the sinks AFTER splitting. >> >> In your case, it could be: >> >> myStream.filter(...).addSink(sink1); >> myStream.addSink(sink2); >> myStream.addSink(sink3); >> >> So sink2 and sink3 would get all of the records, while sink1 only a portion >> of them. >> >> Piotrek >> >> >>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com >>> <mailto:prasannakumarram...@gmail.com>> wrote: >>> >>> Piotr, >>> >>> Thanks for the reply. >>> >>> There is one other case, where some events have to be written to multiple >>> sinks and while other have to be written to just one sink. >>> >>> How could i have a common codeflow/DAG for the same ? >>> >>> I do not want multiple jobs to do the same want to accomplish in a single >>> job . >>> >>> Could i add Stream code "myStream.addSink(sink1)" under a conditional >>> operator such as 'if' to determine . >>> >>> But i suppose here the stream works differently compared to normal code >>> processing. >>> >>> Prasanna. >>> >>> >>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com >>> <mailto:pi...@ververica.com>> wrote: >>> Hi, >>> >>> To the best of my knowledge the following pattern should work just fine: >>> >>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, >>> but any ; >>> myStream.addSink(sink1); >>> myStream.addSink(sink2); >>> myStream.addSink(sink3); >>> >>> All of the records from `myStream` would be passed to each of the sinks. >>> >>> Piotrek >>> >>> > On 24 May 2020, at 19:34, Prasanna kumar <prasannakumarram...@gmail.com >>> > <mailto:prasannakumarram...@gmail.com>> wrote: >>> > >>> > Hi, >>> > >>> > There is a single source of events for me in my system. >>> > >>> > I need to process and send the events to multiple destination/sink at the >>> > same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ] >>> > >>> > I am able send to one sink. >>> > >>> > By adding more sink stream to the source stream could we achieve it . Are >>> > there any shortcomings. >>> > >>> > Please let me know if any one here has successfully implemented one . >>> > >>> > Thanks, >>> > Prasanna.