Hi Konstantin,

Thank you for your answer.

Yes, we have timestamps in the subscription stream

>the disadvantage that you do not make any progress until you see fresh
subscription data. Is this the desired behavior for your use case?
No, this is not acceptable. Reason being the subscription data might be a
slow changing. Let's say it is not getting updated for 6 hrs. In this case
the click stream event is continuously flowing, we want to enrich it
against the slow moving stream.

In case of event time join/low level joins, I am assuming that the
watermarks will still make progress because of
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources.
Or do we still have to handle it in the assigner and emit a watermark if we
are not receiving elements for a while ? (not sure how this will work in
case of low level joins)

I am considering to use low level joins approach using connected streams
where-in I will keep the reference data in state (processElement1) and
click stream event (processElement2) and join this. In this case I will
buffer the elements of click stream events for a configurable period of
time and then delete it. (This is to handle late record).

I think the downstream consumer of enriched data will have to dedup the
duplicate records or else we will end up having stale enrichment.

Regards,
Vinay Patil


On Fri, Apr 24, 2020 at 12:14 PM Konstantin Knauf <kna...@apache.org> wrote:

> Hi Vinay,
>
> I assume your subscription updates also have a timestamp and a watermark.
> Otherwise, there is no way for Flink to tell that the subscription updates
> are late.
>
> If you use a "temporal table "-style join to join the two streams, and you
> do not receive any subscription updates for 2 hours, the watermark will not
> advance (it is the minimum of the two input streams) and hence all click
> events will be buffered. No output. This has the advantage of not sending
> out duplicate records, but the disadvantage that you do not make any
> progress until you see fresh subscription data. Is this the desired
> behavior for your use case?
>
> Best,
>
> Konstantin
>
>
> On Thu, Apr 23, 2020 at 1:29 PM Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I went through Konstantin webinar on 99 ways you can do enrichment. One
>> thing I am failing to understand is how do we efficiently handle stale data
>> enrichment.
>>
>> Context: Let's say I want to enrich user data with the subscription data.
>> Here subscription data is acting as reference data and will be used for
>> joining these two streams based on event time. Consider the following
>> scenario:
>>
>>
>>    1. We are going to enrich Click Stream event (containing user_info)
>>    with Subscription details
>>    2. Subscription Status for Alice user is FREE
>>    3. Current Internal State contains Alice with Subscription status as
>>    FREE
>>    4.
>>
>>    Reference data is not flowing because of some issue for 2hrs
>>    5.
>>
>>    Alice upgraded the subscription to Premium at 10.30 AM
>>    6.
>>
>>    Watched video event comes for Alice at 10.40 AM
>>    -
>>
>>       flink pipeline looks up in internal state and writes to enrichment
>>       topic
>>       -
>>
>>       Enrichment topic now contains Alice -> FREE
>>       7.
>>
>>    Reference data starts flowing in at 11AM
>>    -
>>
>>       let's assume we consider late elements upto 2 hours, so the click
>>       stream event of Alice is still buffered in the state
>>       - The enrichment topic will now contain duplicate records for
>>       Alice because of multiple firings of window
>>    1. Alice -> FREE -> 10 AM
>>       2. Alice -> PREMIUM -> 11 AM
>>
>> Question is how do I avoid sending duplicate records ? I am not able to
>> understand it. I can think of Low Level joins but not sure how do we know
>> if it is stale data or not based on timestamp (watermark) as it can happen
>> that a particular enriched record is not updated for 6 hrs.
>>
>> Regards,
>> Vinay Patil
>>
>
>
> --
>
> Konstantin Knauf
>

Reply via email to