Re: What is the best way to have a cache of an external database in Flink?

2021-01-22 Thread Selvaraj chennappan
Hi,
Perhaps  broadcast state is natural fit for this scenario.
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html


Thanks,
Selvaraj C

On Fri, 22 Jan 2021 at 8:45 PM, Kumar Bolar, Harshith 
wrote:

> Hi all,
>
> The external database consists of a set of rules for each key, these rules
> should be applied on each stream element in the Flink job. Because it is
> very expensive to make a DB call for each element and retrieve the rules, I
> want to fetch the rules from the database at initialization and store it in
> a local cache.
>
> When rules are updated in the external database, a status change event is
> published to the Flink job which should be used to fetch the rules and
> refresh this cache.
>
> What is the best way to achieve what I've described? I looked into keyed
> state but initializing all keys and refreshing the keys on update doesn't
> seem possible.
>
> Thanks,
>
> Harshith
>
-- 





Regards,
Selvaraj C


connecting two streams flink

2019-01-29 Thread Selvaraj chennappan
Team,

I have two kafka consumer for same topic and want to join second stream to
first after couple of subtasks computation in the first stream then
validate the record . KT - C1 ,C2

KT - C1 - Transformation(FlatMap) - Dedup - Validate --ifvalidsave it to DB
-C2 - Process --

if record is invalid then save it to Error topic .

How do I merge these two streams one(c2) is fast and other(c1) is little
slow (two levels of computation) ?
Same record is flowing from C1-Flatmap-FlatMap and other consumer C2 . I
have to validate that record based on the rules.
Please find the attached image herewith reference.
[image: two-stream.png]

-- 





Regards,
Selvaraj C


Re: Forking a stream with Flink

2019-01-29 Thread Selvaraj chennappan
UseCase:- We have kafka consumer to read messages(json ) then it applies to
flatmap  for transformation based on the rules ( rules are complex ) and
convert it to pojo .
We want to verify the record(pojo) is valid by checking field by field of
that record .if record is invalid due to transformation rules  then move to
error topic otherwise send to DB.

I thought of Implementing like adding another consumer to read json
message  and compare json message attributes with transformed record
attributes .

Hence I need to join/coprocess these two streams to validate then decide
whether persist to db or sending to error topic.

Please let me know if you need more information.

On Tue, Jan 29, 2019 at 6:21 PM miki haiat  wrote:

> Im not sure if i got your question correctly, can you elaborate more on
> your use case
>


-- 





Regards,
Selvaraj C


Re: connecting two streams flink

2019-01-29 Thread Selvaraj chennappan
I have  pipeline defined . pipeline does source(kafka)
,transformation,dedup and persisting to DB .
[image: image.png]

Before reaching to DB task lots of transformation is applied on the
pipeline  Therefore want to validate the record with raw json message which
is available in kafka  with the transformed record.

Hence I want to know How to do that in flink.



On Tue, Jan 29, 2019 at 6:43 PM miki haiat  wrote:

> If c1 and c2 are  listing   to the same topic they will  consume the same
> data .
> so i cant understand this
>
>>  these two streams one(c2) is fast and other(c1)
>
>
>
>
>
> On Tue, Jan 29, 2019 at 2:44 PM Selvaraj chennappan <
> selvarajchennap...@gmail.com> wrote:
>
>> Team,
>>
>> I have two kafka consumer for same topic and want to join second stream
>> to first after couple of subtasks computation in the first stream then
>> validate the record . KT - C1 ,C2
>>
>> KT - C1 - Transformation(FlatMap) - Dedup - Validate --ifvalidsave it to
>> DB
>> -C2 - Process --
>>
>> if record is invalid then save it to Error topic .
>>
>> How do I merge these two streams one(c2) is fast and other(c1) is little
>> slow (two levels of computation) ?
>> Same record is flowing from C1-Flatmap-FlatMap and other consumer C2 . I
>> have to validate that record based on the rules.
>> Please find the attached image herewith reference.
>> [image: two-stream.png]
>>
>> --
>>
>>
>>
>>
>>
>> Regards,
>> Selvaraj C
>>
>

-- 





Regards,
Selvaraj C


Re: Forking a stream with Flink

2019-01-29 Thread Selvaraj chennappan
I think there is misunderstanding . I want to compare raw json and
transformed record .
Hence I need two consumer and merge the stream for comparison.
I have  pipeline defined . pipeline does source(kafka)
,transformation,dedup and persisting to DB .
[image: image.png]

Before reaching to DB task lots of transformation is applied on the
pipeline  Therefore want to validate the record with raw json message which
is available in kafka  with the transformed record.

Hence I want to know How to do that in flink.


On Tue, Jan 29, 2019 at 8:54 PM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi Selvaraj
>
> In your pojo add data member as status or something like that,now set it
> error in case it is invaild .pass the output of flatmap
> to split opertor there you can split the stream
>
> On Tue, Jan 29, 2019 at 6:39 PM Selvaraj chennappan <
> selvarajchennap...@gmail.com> wrote:
>
>> UseCase:- We have kafka consumer to read messages(json ) then it applies
>> to flatmap  for transformation based on the rules ( rules are complex ) and
>> convert it to pojo .
>> We want to verify the record(pojo) is valid by checking field by field of
>> that record .if record is invalid due to transformation rules  then move to
>> error topic otherwise send to DB.
>>
>> I thought of Implementing like adding another consumer to read json
>> message  and compare json message attributes with transformed record
>> attributes .
>>
>> Hence I need to join/coprocess these two streams to validate then decide
>> whether persist to db or sending to error topic.
>>
>> Please let me know if you need more information.
>>
>> On Tue, Jan 29, 2019 at 6:21 PM miki haiat  wrote:
>>
>>> Im not sure if i got your question correctly, can you elaborate more on
>>> your use case
>>>
>>
>>
>> --
>>
>>
>>
>>
>>
>> Regards,
>> Selvaraj C
>>
>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>

-- 





Regards,
Selvaraj C


Re: getting duplicate messages from duplicate jobs

2019-01-30 Thread Selvaraj chennappan
I have faced same problem .
https://stackoverflow.com/questions/54286486/two-kafka-consumer-in-same-group-and-one-partition


On Wed, Jan 30, 2019 at 6:11 PM Avi Levi  wrote:

> Ok, if you guys think it's should be like that then so be it. All I am
> saying is that it is not standard behaviour from kafka consumer, at least
> according to the documentation
>  . I understand
> that flink implements things differently and all I am saying that this is
> not according to kafka standard consumer group.
>
>
> On Tue, Jan 29, 2019 at 9:47 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> Yes, Dawid is correct.
>>
>> The "
>> 
>> group.id
>> "
>> setting in Flink's Kafka Consumer is only used for group offset fetching
>> and committing offsets back to Kafka (only for exposure purposes, not used
>> for processing guarantees).
>> The Flink Kafka Consumer uses static partition assignment on the
>> KafkaConsumer API, and not consumer group-based automatic partition
>> assignments.
>>
>> Cheers,
>> Gordon
>>
>> On Sun, Jan 27, 2019 at 12:28 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Forgot to cc Gordon :)
>>>
>>> On 23/01/2019 18:02, Avi Levi wrote:
>>> > Hi,
>>> > This quite confusing.
>>> > I submitted the same stateless job twice (actually I upload it once).
>>> > However when I place a message on kafka, it seems that both jobs
>>> > consumes it, and publish the same result (we publish the result to
>>> > other kafka topic, so I actually see the massage duplicated on kafka
>>> > ). how can it be ? both jobs are using the same group id (group id is
>>> > fixed and not generated )
>>> >
>>> > Kind regards
>>> > Avi
>>>
>>>

-- 





Regards,
Selvaraj C


Re: Flink Standalone cluster - logging problem

2019-02-11 Thread Selvaraj chennappan
Could you pls try modifying conf/logback.xml .

Regards,
Selvaraj C

On Mon, Feb 11, 2019 at 4:32 PM simpleusr  wrote:

> Hi Gary,
>
> By "job logs" I mean all the loggers under a subpackage of
> com.mycompany.xyz
> .
>
> We are using ./bin/flink run command for job execution thats why I modified
> log4j-cli.properties. Modification of log4j.properties also did not help...
>
> Regards
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 





Regards,
Selvaraj C


update the existing Keyed value state

2019-05-03 Thread Selvaraj chennappan
Hi Users,
We want to have a real time aggregation (KPI) .
we are maintaining aggregation counters in the keyed value  state  .
key could be customer activation date and type.
Lot of counters are maintained against that key.

If we want to add one more counter for the existing keys which is in the
state backend.

1.compute the new counter value using database data .
2. group the counter value based on the key

How do we update the new computed counter to all the existing keyed state?.





Regards,
Selvaraj C