Hi Pooja,

My main confusion is, if 2 events have the same transaction_id, how can we
tell if it is a wanted one for value updates, or it is an unwanted
duplicate?

MapState with a TTL can be used for deduplicating, if it is supposed that a
duplicated event would not happen too late after the original event was
processed.

Thanks,
Zhu Zhu

Rafi Aroch <rafi.ar...@gmail.com> 于2019年12月18日周三 下午3:50写道:

> Hi Pooja,
>
> Here's an implementation from Jamie Grier for de-duplication using
> in-memory cache with some expiration time:
>
> https://github.com/jgrier/FilteringExample/blob/master/src/main/java/com/dataartisans/DedupeFilteringJob.java
>
> If for your use-case you can limit the time period where duplications may
> happen, you can use this approach.
>
> Thanks,
> Rafi
>
>
> On Wed, Dec 18, 2019 at 8:16 AM Pooja Agrawal <pooja.ag...@gmail.com>
> wrote:
>
>> Hey,
>>
>> I am sorry for the confusion. So, the value is not already present in the
>> event. We are reading it from a static table (kind of data enrichment in
>> flink job). Above event is an enriched event.
>> If we say that this is some transaction event, the user would have done
>> the transaction once and hence the transaction_id is unique. But, the table
>> from where we are reading the value may contain the wrong value (not
>> always, sometimes because of bug). In this case, we may want to reprocess
>> that transaction event with new value (here, the transaction_id will be
>> same as previous, but the value will change). I hope this clears the
>> scenario. Let me know if you have any other questions.
>>
>> To solve the idempotency problem, you suggested to maintain a set
>> recording transaction_id(s). Since, we are aggregating over all events seen
>> till now, the number of events and hence ids will be too large. I am
>> assuming we will need to have some external store here and do a lookup
>> every time we process an event. This may increase the latency. Can you
>> suggest the efficient way to solve this? and if we need to have an external
>> store, what will be the best candidate?
>>
>> Thanks
>> Pooja
>>
>>
>>
>> On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu <reed...@gmail.com> wrote:
>>
>>> Hi Pooja,
>>>
>>> I'm a bit confused since in 1) it says that "If two events have same
>>> transaction_id, we can say that they are duplicates", and in 2) it says
>>> that "Since this is just a value change, the transaction_id will be same".
>>> Looks to me they are conflicting. Usually in case 2) scenarios, the value
>>> updates event is considered as new event which does not share the unique id
>>> with prior events.
>>>
>>> If each event has a unique transaction_id, you can use it to
>>> de-duplicate the events via a set recording the transaction_id(s) which are
>>> already processed. And then 2) would not be a problem with the unique
>>> transaction_id assumption.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Pooja Agrawal <pooja.ag...@gmail.com> 于2019年12月17日周二 下午9:17写道:
>>>
>>>>
>>>> Hi,
>>>>
>>>> I have a use case where we are reading events from kinesis stream.The
>>>> event can look like this
>>>> Event {
>>>>   event_id,
>>>>   transaction_id
>>>>   key1,
>>>>   key2,
>>>>   value,
>>>>   timestamp,
>>>>   some other fields...
>>>> }
>>>>
>>>> We want to aggregate the values per key for all events we have seen
>>>> till now (as simple as "select key1, key2, sum(value) from events group by
>>>> key1, key2key."). For this I have created a simple flink job which uses
>>>> flink-kinesis connector and applies keyby() and sum() on the incoming
>>>> events. I am facing two challenges here:
>>>>
>>>> 1) The incoming events can have duplicates. How to maintain exactly
>>>> once processing here, as processing duplicate events can give me erroneous
>>>> result? The field transaction_id will be unique for each events. If two
>>>> events have same transaction_id, we can say that they are duplicates (By
>>>> duplicates here, I don't just mean the retried ones. The same message can
>>>> be present in kinesis with different sequence number. I am not sure if
>>>> flink-kinesis connector can handle that, as it is using KCL underlying
>>>> which I assume doesn't take care of it)
>>>>
>>>> 2) There can be the the cases where the value has been updated for a
>>>> key after processing the event and we may want to reprocess those events
>>>> with new value. Since this is just a value change, the transaction_id will
>>>> be same. The idempotency logic will not allow to reprocess the events. What
>>>> are the ways to handle such scenarios in flink?
>>>>
>>>> Thanks
>>>> Pooja
>>>>
>>>>
>>>> --
>>>> Warm Regards,
>>>> Pooja Agrawal
>>>>
>>>
>>
>> --
>> Warm Regards,
>> Pooja Agrawal
>>
>

Reply via email to