Hi Prasanna,

1.

> The object probably contains or references non serializable fields.

That should speak for itself. Flink was not able to distribute your code to the 
worker nodes. 

You have used a lambda function that turned out to be non serialisable. You 
should unit test your code and in this case add a serialisation/deserialisation 
round trip unit test for the filter function. For starters I would suggest to 
not use lambda function, but a full/proper named class and work from there.

2.

Can not you create an array/map/collection of OutputTags corresponding to the 
the sinks/topics combinations. One OutputTag per sink(/topic) and use this 
array/map/collection inside your process function?

Piotrek 

> On 2 Jun 2020, at 13:49, Prasanna kumar <prasannakumarram...@gmail.com> wrote:
> 
> Hi ,
> 
> I have a Event router Registry as this. By reading this as input i need to 
> create a Job which would redirect the messages to the correct sink as per 
> condition.
> {
>   "eventRouterRegistry": [
>     { "eventType": "biling", "outputTopic": "billing" },
>     { "eventType": "cost", "outputTopic": "cost" },
>     { "eventType": "marketing", "outputTopic": "marketing" }
>   ]
> }
> I tried the following two approaches. 
> 1) Using the Filter method
> 
> public static void setupRouterJobsFilter(
>             List<eventRouterRegistry> registryList, 
> StreamExecutionEnvironment env) {
> 
>    Properties props = new Properties();
>    props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>    props.put("client.id <http://client.id/>", "flink-example1");
> 
>    FlinkKafkaConsumer011 fkC =
>                new FlinkKafkaConsumer011<>("EVENTTOPIC", new 
> SimpleStringSchema(), props);
> 
>    DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
> 
>    for (eventRouterRegistry record : registryList) {
>       System.out.print(record.getEventType() + " <==> " + 
> record.getOutputTopic());
> 
>       FlinkKafkaProducer011 fkp =
>                   new FlinkKafkaProducer011<>(record.getOutputTopic(), new 
> SimpleStringSchema(), props);
> 
>       inputStream.filter(msg -> msg.equals(record.getEventType()) );
>       //sideOutputStream.print();
>       inputStream.addSink(fkp).name(record.getOutputTopic());
>    }
> }
> Here am getting the following error. 
>  ./bin/flink run -c firstflinkpackage.GenericStreamRouter 
> ../../myrepository/flink001/target/flink001-1.0.jar
> Starting execution of program
> ---------------------------
>  The program finished with the following exception:
> 
> The implementation of the FilterFunction is not serializable. The object 
> probably contains or references non serializable fields.
>       org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>       
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
>       
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>       
> org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686)
>       
> firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118)
>       firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93)
> 
> Looks like I should not use  record.getEventType() as this is outside of the 
> stream. 
> 
> Is there a any way to use external variable here mainly to Generalise the 
> process.
> 
> 2) Using the Side Output method
> 
> Following code is my attempt in creating a generic way for sideoutput 
> creation. 
> 
> I am able to create the Sink Streams based on the list (eventRouterRegistry). 
> 
> But i could not generalise the Output tag creation. 
> The issue here is the output tag is fixed. 
> My output tag need to be the Event Type and that needs to be in Process 
> Function too. 
> 
> How do i implement. Should I write my own process function ?
>  public static void setupRouterJobs(
>      List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) {
> 
>    Properties props = new Properties();
>    props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>    props.put("client.id <http://client.id/>", "flink-example1");
> 
>    FlinkKafkaConsumer011 fkC =
>        new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), 
> props);
> 
>    DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
>      //Even if i try to generalise OUtput tag here. How do i do it inside 
> ProcessFunction
>      final OutputTag<String> outputTag = new OutputTag<String>("side-output") 
> {};
>      SingleOutputStreamOperator<String> mainDataStream =
>        inputStream.process(
>            new ProcessFunction<String, String>() {
> 
>              @Override
>              public void processElement(String value, Context ctx, 
> Collector<String> out)
>                  throws Exception {
>                // emit data to side output
>                ctx.output(OutputTag,  value);
>              }
>            });
> 
>    for (eventRouterRegistry record : registryList) {
>      System.out.print(record.getEventType() + " <==> " + 
> record.getOutputTopic());
> 
>      FlinkKafkaProducer011 fkp =
>          new FlinkKafkaProducer011<>(record.getOutputTopic(), new 
> SimpleStringSchema(), props);
> 
>      DataStream<String> sideOutputStream = 
> mainDataStream.getSideOutput(outputTag);
>      sideOutputStream.print();
>      sideOutputStream.addSink(fkp).name(record.getOutputTopic());
>    }
> }
> 
> Thanks,
> Prasanna.
> 
> 
> 
> On Thu, May 28, 2020 at 8:24 PM Prasanna kumar <prasannakumarram...@gmail.com 
> <mailto:prasannakumarram...@gmail.com>> wrote:
> Alexander, 
> 
> Thanks for the reply. Will implement and come back in case of any questions. 
> 
> Prasanna.
> 
> On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <alexan...@ververica.com 
> <mailto:alexan...@ververica.com>> wrote:
> Hi Prasanna,
> 
> if the set of all possible sinks is known in advance, side outputs will be 
> generic enough to express your requirements. Side output produces a stream. 
> Create all of the side output tags, associate each of them with one sink, add 
> conditional logic around `ctx.output(outputTag, ... );`  to decide where to 
> dispatch the messages  (see [1]), collect to none or many side outputs, 
> depending on your logic.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>
> --
> Alexander Fedulov | Solutions Architect
> 
> <image.png>
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>  <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> 
> 
> On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <prasannakumarram...@gmail.com 
> <mailto:prasannakumarram...@gmail.com>> wrote:
> Piotr, 
> 
> There is an event and subscriber registry as JSON file which has the table 
> event mapping and event-subscriber mapping as mentioned below.
> 
> Based on the set JSON , we need to job to go through the table updates and 
> create events and for each event there is a way set how to sink them.
> 
> The sink streams have to be added based on this JSON. Thats what i mentioned 
> as no predefined sink in code earlier.
> 
> You could see that each event has different set of sinks. 
> 
> Just checking how much generic could Side-output streams be ?.
> 
> Source -> generate events -> (find out sinks dynamically in code ) -> write 
> to the respective sinks. 
> 
> {
>   " tablename ": "source.table1",
>   "events": [
>     {
>       "operation": "update",
>       "eventstobecreated": [
>         {
>           "eventname": "USERUPDATE",
>           "Columnoperation": "and",
>           "ColumnChanges": [
>             {
>               "columnname": "name"
>             },
>             {
>               "columnname": "loginenabled",
>               "value": "Y"
>             }
>           ],
>           "Subscribers": [
>             {
>               "customername": "c1",
>               "method": "Kafka",
>               "methodparams": {
>                 "topicname": "USERTOPIC"
>               }
>             },
>             {
>               "customername": "c2",
>               "method": "S3",
>               "methodparams": {
>                 "folder": "aws://folderC2"
>               }}, ]}]
>     },
>     {
>       "operation": "insert",
>       "eventstobecreated": [
>           "eventname": "USERINSERT",
>           "operation": "insert",
>           "Subscribers": [
>             {
>               "teamname": "General",
>               "method": "Kafka",
>               "methodparams": {
>                 "topicname": "new_users"
>               }
>             },
>             {
>               "teamname": "General",
>               "method": "kinesis",
>               "methodparams": {
>                 "URL": "new_users",
>                 "username": "uname",
>                 "password":  "pwd"
>               }}, ]}]
>     },
>     {
>       "operation": "delete",
>       "eventstobecreated": [
>         {
>           "eventname": "USERDELETE",
>           "Subscribers": [
>             {
>               "customername": "c1",
>               "method": "Kafka",
>               "methodparams": {
>                 "topicname": "USERTOPIC"
>               }
>             },
>             {
>               "customername": "c4",
>               "method": "Kafka",
>               "methodparams": {
>                 "topicname": "deleterecords"
>              }}, ]}]
>      },
> }
> 
> Please let me know your thoughts on this.
> 
> Thanks,
> Prasanna.
> 
> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi,
> 
> I’m not sure if I fully understand what do you mean by
> 
> > The point is the sink are not predefined.
> 
> You must know before submitting the job, what sinks are going to be used in 
> the job. You can have some custom logic, that would filter out records before 
> writing them to the sinks, as I proposed before, or you could use side 
> outputs [1] would be better suited to your use case? 
> 
> Piotrek
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>
> 
>> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com 
>> <mailto:prasannakumarram...@gmail.com>> wrote:
>> 
>> Thanks Piotr for the Reply. 
>> 
>> I will explain my requirement in detail. 
>> 
>> Table Updates -> Generate Business Events -> Subscribers 
>> 
>> Source Side
>> There are CDC of 100 tables which the framework needs to listen to. 
>> 
>> Event Table Mapping
>> 
>> There would be Event associated with table in a m:n fashion. 
>> 
>> say there are tables TA, TB, TC. 
>> 
>> EA, EA2 and EA3 are generated from TA (based on conditions)
>> EB generated from TB (based on conditions)
>> EC generated from TC (no conditions.)
>> 
>> Say there are events EA,EB,EC generated from the tables TA, TB, TC 
>> 
>> Event Sink Mapping
>> 
>> EA has following sinks. kafka topic SA,SA2,SAC. 
>> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
>> EC has only rest endpoint RC. 
>> 
>> The point is the sink are not predefined. [. But i only see the example 
>> online where , flink code having explicit myStream.addSink(sink2);   ]
>> 
>> We expect around 500 types of events in our platform in another 2 years 
>> time. 
>> 
>> We are looking at writing a generic job for the same , rather than writing 
>> one for new case.
>> 
>> Let me know your thoughts and flink suitability to this requirement.
>> 
>> Thanks
>> Prasanna.
>> 
>> 
>> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com 
>> <mailto:pi...@ververica.com>> wrote:
>> Hi,
>> 
>> You could easily filter/map/process the streams differently before writing 
>> them to the sinks. Building on top of my previous example, this also should 
>> work fine:
>> 
>> 
>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, but 
>> any ;
>> 
>> myStream.baz().addSink(sink1);
>> myStream.addSink(sink2);
>> myStream.qux().quuz().corge().addSink(sink3);
>>  
>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that 
>> you wish. `foo` and `bar` would be applied once to the stream, before it’s 
>> going to be split to different sinks, while `baz`, `qux`, `quuz` and `corge` 
>> would be applied to only of the sinks AFTER splitting.
>> 
>> In your case, it could be:
>> 
>> myStream.filter(...).addSink(sink1);
>> myStream.addSink(sink2);
>> myStream.addSink(sink3);
>> 
>> So sink2 and sink3 would get all of the records, while sink1 only a portion 
>> of them.
>> 
>> Piotrek 
>> 
>> 
>>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com 
>>> <mailto:prasannakumarram...@gmail.com>> wrote:
>>> 
>>> Piotr, 
>>> 
>>> Thanks for the reply. 
>>> 
>>> There is one other case, where some events have to be written to multiple 
>>> sinks and while other have to be written to just one sink. 
>>> 
>>> How could i have a common codeflow/DAG for the same ?
>>> 
>>> I do not want multiple jobs to do the same want to accomplish in a single 
>>> job .
>>> 
>>> Could i add Stream code "myStream.addSink(sink1)" under a conditional 
>>> operator such as 'if' to determine . 
>>> 
>>> But i suppose here the stream works differently compared to normal code 
>>> processing.
>>> 
>>> Prasanna.
>>> 
>>> 
>>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com 
>>> <mailto:pi...@ververica.com>> wrote:
>>> Hi,
>>> 
>>> To the best of my knowledge the following pattern should work just fine:
>>> 
>>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, 
>>> but any ;
>>> myStream.addSink(sink1);
>>> myStream.addSink(sink2);
>>> myStream.addSink(sink3);
>>> 
>>> All of the records from `myStream` would be passed to each of the sinks.
>>> 
>>> Piotrek
>>> 
>>> > On 24 May 2020, at 19:34, Prasanna kumar <prasannakumarram...@gmail.com 
>>> > <mailto:prasannakumarram...@gmail.com>> wrote:
>>> > 
>>> > Hi,
>>> > 
>>> > There is a single source of events for me in my system. 
>>> > 
>>> > I need to process and send the events to multiple destination/sink at the 
>>> > same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>>> > 
>>> > I am able send to one sink.
>>> > 
>>> > By adding more sink stream to the source stream could we achieve it . Are 
>>> > there any shortcomings.  
>>> > 
>>> > Please let me know if any one here has successfully implemented one .
>>> > 
>>> > Thanks,
>>> > Prasanna.

Reply via email to