Hi Prasanna,

if the set of all possible sinks is known in advance, side outputs will be
generic enough to express your requirements. Side output produces a stream.
Create all of the side output tags, associate each of them with one sink,
add conditional logic around `ctx.output(outputTag, ... *)*;`  to decide
where to dispatch the messages  (see [1]), collect to none or many side
outputs, depending on your logic.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

--

Alexander Fedulov | Solutions Architect

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Piotr,
>
> There is an event and subscriber registry as JSON file which has the table
> event mapping and event-subscriber mapping as mentioned below.
>
> Based on the set JSON , we need to job to go through the table updates and
> create events and for each event there is a way set how to sink them.
>
> The sink streams have to be added based on this JSON. Thats what i
> mentioned as no predefined sink in code earlier.
>
> You could see that each event has different set of sinks.
>
> Just checking how much generic could Side-output streams be ?.
>
> Source -> generate events -> (find out sinks dynamically in code ) ->
> write to the respective sinks.
>
> {
>   " tablename ": "source.table1",
>   "events": [
>     {
>       "operation": "update",
>       "eventstobecreated": [
>         {
>           "eventname": "USERUPDATE",
>           "Columnoperation": "and",
>           "ColumnChanges": [
>             {
>               "columnname": "name"
>             },
>             {
>               "columnname": "loginenabled",
>               "value": "Y"
>             }
>           ],
>           "Subscribers": [
>             {
>               "customername": "c1",
>               "method": "Kafka",
>               "methodparams": {
>                 "topicname": "USERTOPIC"
>               }
>             },
>             {
>               "customername": "c2",
>               "method": "S3",
>               "methodparams": {
>                 "folder": "aws://folderC2"
>               }}, ]}]
>     },
>     {
>       "operation": "insert",
>       "eventstobecreated": [
>           "eventname": "USERINSERT",
>           "operation": "insert",
>           "Subscribers": [
>             {
>               "teamname": "General",
>               "method": "Kafka",
>               "methodparams": {
>                 "topicname": "new_users"
>               }
>             },
>             {
>               "teamname": "General",
>               "method": "kinesis",
>               "methodparams": {
>                 "URL": "new_users",
>                 "username": "uname",
>                 "password":  "pwd"
>               }}, ]}]
>     },
>     {
>       "operation": "delete",
>       "eventstobecreated": [
>         {
>           "eventname": "USERDELETE",
>           "Subscribers": [
>             {
>               "customername": "c1",
>               "method": "Kafka",
>               "methodparams": {
>                 "topicname": "USERTOPIC"
>               }
>             },
>             {
>               "customername": "c4",
>               "method": "Kafka",
>               "methodparams": {
>                 "topicname": "deleterecords"
>              }}, ]}]
>      },
> }
>
> Please let me know your thoughts on this.
>
> Thanks,
> Prasanna.
>
> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
>> Hi,
>>
>> I’m not sure if I fully understand what do you mean by
>>
>> > The point is the sink are not predefined.
>>
>> You must know before submitting the job, what sinks are going to be used
>> in the job. You can have some custom logic, that would filter out records
>> before writing them to the sinks, as I proposed before, or you could use
>> side outputs [1] would be better suited to your use case?
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>
>> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com>
>> wrote:
>>
>> Thanks Piotr for the Reply.
>>
>> I will explain my requirement in detail.
>>
>> Table Updates -> Generate Business Events -> Subscribers
>>
>> *Source Side*
>> There are CDC of 100 tables which the framework needs to listen to.
>>
>> *Event Table Mapping*
>>
>> There would be Event associated with table in a *m:n* fashion.
>>
>> say there are tables TA, TB, TC.
>>
>> EA, EA2 and EA3 are generated from TA (based on conditions)
>> EB generated from TB (based on conditions)
>> EC generated from TC (no conditions.)
>>
>> Say there are events EA,EB,EC generated from the tables TA, TB, TC
>>
>> *Event Sink Mapping*
>>
>> EA has following sinks. kafka topic SA,SA2,SAC.
>> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
>> EC has only rest endpoint RC.
>>
>> The point is the sink are not predefined. [. But i only see the example
>> online where , flink code having explicit myStream.addSink(sink2);   ]
>>
>> We expect around 500 types of events in our platform in another 2 years
>> time.
>>
>> We are looking at writing a generic job for the same , rather than
>> writing one for new case.
>>
>> Let me know your thoughts and flink suitability to this requirement.
>>
>> Thanks
>> Prasanna.
>>
>>
>> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>
>>> Hi,
>>>
>>> You could easily filter/map/process the streams differently before
>>> writing them to the sinks. Building on top of my previous example, this
>>> also should work fine:
>>>
>>>
>>> DataStream myStream = env.addSource(…).foo().bar() // for custom source,
>>> but any ;
>>>
>>> myStream.baz().addSink(sink1);
>>> myStream.addSink(sink2);
>>> myStream.qux().quuz().corge().addSink(sink3);
>>>
>>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions
>>> that you wish. `foo` and `bar` would be applied once to the stream, before
>>> it’s going to be split to different sinks, while `baz`, `qux`, `quuz` and
>>> `corge` would be applied to only of the sinks AFTER splitting.
>>>
>>> In your case, it could be:
>>>
>>> myStream.filter(...).addSink(sink1);
>>> myStream.addSink(sink2);
>>> myStream.addSink(sink3);
>>>
>>> So sink2 and sink3 would get all of the records, while sink1 only a
>>> portion of them.
>>>
>>> Piotrek
>>>
>>>
>>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com>
>>> wrote:
>>>
>>> Piotr,
>>>
>>> Thanks for the reply.
>>>
>>> There is one other case, where some events have to be written to
>>> multiple sinks and while other have to be written to just one sink.
>>>
>>> How could i have a common codeflow/DAG for the same ?
>>>
>>> I do not want multiple jobs to do the same want to accomplish in a
>>> single job .
>>>
>>> Could i add Stream code "myStream.addSink(sink1)" under a conditional
>>> operator such as 'if' to determine .
>>>
>>> But i suppose here the stream works differently compared to normal code
>>> processing.
>>>
>>> Prasanna.
>>>
>>>
>>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> To the best of my knowledge the following pattern should work just fine:
>>>>
>>>> DataStream myStream = env.addSource(…).foo().bar() // for custom
>>>> source, but any ;
>>>> myStream.addSink(sink1);
>>>> myStream.addSink(sink2);
>>>> myStream.addSink(sink3);
>>>>
>>>> All of the records from `myStream` would be passed to each of the sinks.
>>>>
>>>> Piotrek
>>>>
>>>> > On 24 May 2020, at 19:34, Prasanna kumar <
>>>> prasannakumarram...@gmail.com> wrote:
>>>> >
>>>> > Hi,
>>>> >
>>>> > There is a single source of events for me in my system.
>>>> >
>>>> > I need to process and send the events to multiple destination/sink at
>>>> the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>>>> >
>>>> > I am able send to one sink.
>>>> >
>>>> > By adding more sink stream to the source stream could we achieve it .
>>>> Are there any shortcomings.
>>>> >
>>>> > Please let me know if any one here has successfully implemented one .
>>>> >
>>>> > Thanks,
>>>> > Prasanna.
>>>>
>>>>
>>>
>>

Reply via email to