Re: KStream-KTable join with the KTable given a "head start"

2016-04-01 Thread Guozhang Wang
Hi Jeff,

Yes, the record timestamps are extracted once they are polled by the
consumer, and are used in multiple places including stream synchronization.

For details you can read this part of the docs:

http://docs.confluent.io/2.1.0-alpha1/streams/architecture.html#flow-control-with-timestamps

Note that this flow control mechanism is not perfect, i.e. if you have a
much later record in one of your stream, it may still be processed out of
order.

Guozhang



On Thu, Mar 31, 2016 at 6:50 AM, Jeff Klukas  wrote:

>
>
> On Thu, Mar 31, 2016 at 9:03 AM, Jeff Klukas  wrote:
>
>>
>>
>> On Wed, Mar 30, 2016 at 5:21 PM, Guozhang Wang 
>> wrote:
>>
>>> Hi Jeff,
>>>
>>> If you are using "KStream.leftJoin(KTable..)" then you cannot specify a
>>> join window, and the stream-table join would be implemented just as a table
>>> query upon each arrival of the stream record and a table update upon each
>>> arrival of the table record.
>>>
>>> You can use a join window is you call "KStream.join(KStream...)". Could
>>> you tell me which join operator are you planning to use?
>>>
>>
>> KStream.leftJoin(KTable..) makes sense, but then how to timestamps play
>> into it? The docs and your explanation both make it sound like the join is
>> going to happen as soon as the stream record arrives, so it's unclear to me
>> how modifying timestamps helps in this case.
>>
>
> I read over your reply a few more times, and I can see now that you're
> pointing out that Kafka Streams uses timestamps for more than just
> windowing. And indeed the Confluent docs mention that "timestamps are used
> to control the progress of streams". Does this mean that the application is
> monitoring timestamp progress and is throttling consumers to make better
> progress on the ones that are further behind?
>
> That's exciting, and goes beyond what I had expected Kafka Streams to do
> by default. Stream processing is still pretty new to me, so perhaps this is
> an assumed responsibility of a stream processing framework, but it seems
> like it could be a great selling point.
>
> Is there an obvious class implementation to look at to better understand
> how Kafka Streams is coordinating stream progress within a topology?
>
> I'll plan to post a resolution back to the user mailing list once I feel
> like I have a reasonable handle on how this works.
>
> And thank you so much for your help.
>
>
>>
>>> On Wed, Mar 30, 2016 at 2:15 PM, Jeff Klukas  wrote:
>>>
>>>>
>>>>
>>>> -- Forwarded message --
>>>>> From: Jeff Klukas 
>>>>> To: users@kafka.apache.org
>>>>> Cc:
>>>>> Date: Wed, 30 Mar 2016 11:14:53 -0400
>>>>> Subject: KStream-KTable join with the KTable given a "head start"
>>>>> I have a KStream that I want to enrich with some values from a lookup
>>>>> table. When a new key enters the KStream, there's likely to be a
>>>>> corresponding entry arriving on the KStream at the same time, so we
>>>>> end up
>>>>> with a race condition. If the KTable record arrives first, then its
>>>>> value
>>>>> is available for joining when the corresponding arrives on the
>>>>> KStream.  If
>>>>> the KStream record arrives first, however, we'll get a null join even
>>>>> if
>>>>> the KTable gets the corresponding record only milliseconds later.
>>>>>
>>>>> I'd like to give the KTable a "head start" of ~10 seconds, so that it
>>>>> gets
>>>>> a chance to get updated before the corresponding records arrive on the
>>>>> KStream. Could I achieve this using one of the existing Windowing
>>>>> strategies?
>>>>>
>>>>>
>>>>> -- Forwarded message --
>>>>> From: Guozhang Wang 
>>>>> To: "users@kafka.apache.org" 
>>>>> Cc:
>>>>> Date: Wed, 30 Mar 2016 13:51:03 -0700
>>>>> Subject: Re: KStream-KTable join with the KTable given a "head start"
>>>>> Hi Jeff,
>>>>>
>>>>> This is a common case of stream-table join, in that the joining results
>>>>> depending on the arrival ordering from these two sources.
>>>>>
>>>>> In Kafka Streams you can try to "synchronize" multiple input streams
>>>>> through the &q

Re: KStream-KTable join with the KTable given a "head start"

2016-03-30 Thread Jeff Klukas
-- Forwarded message --
> From: Jeff Klukas 
> To: users@kafka.apache.org
> Cc:
> Date: Wed, 30 Mar 2016 11:14:53 -0400
> Subject: KStream-KTable join with the KTable given a "head start"
> I have a KStream that I want to enrich with some values from a lookup
> table. When a new key enters the KStream, there's likely to be a
> corresponding entry arriving on the KStream at the same time, so we end up
> with a race condition. If the KTable record arrives first, then its value
> is available for joining when the corresponding arrives on the KStream.  If
> the KStream record arrives first, however, we'll get a null join even if
> the KTable gets the corresponding record only milliseconds later.
>
> I'd like to give the KTable a "head start" of ~10 seconds, so that it gets
> a chance to get updated before the corresponding records arrive on the
> KStream. Could I achieve this using one of the existing Windowing
> strategies?
>
>
> -- Forwarded message --
> From: Guozhang Wang 
> To: "users@kafka.apache.org" 
> Cc:
> Date: Wed, 30 Mar 2016 13:51:03 -0700
> Subject: Re: KStream-KTable join with the KTable given a "head start"
> Hi Jeff,
>
> This is a common case of stream-table join, in that the joining results
> depending on the arrival ordering from these two sources.
>
> In Kafka Streams you can try to "synchronize" multiple input streams
> through the "TimestampExtractor" interface, which is used to assign a
> timestamp to each record polled from Kafka to start the processing. You
> can, for example, set the timestamps for your KStream within a later time
> interval and the timestamps for your KTable stream with an earlier time
> interval, so that the records from table are likely to be processed first.
> Note that this is an best effort, in that we cannot guarantee global
> ordering across streams while processing, that if you have a much later
> record coming from KTable then it will not block earlier records from
> KStream from being processed first. But we think this mechanism should be
> sufficient in practice.
>
> Let me know if it fits with your scenario, and if not we can talk about how
> it can be possibly improved.
>


What would I pass in for a window in this case? Or would I not pass in a
window?

I don't want to put any lower limit on the KTable timestamps I'd be willing
to join on (the corresponding entry in the KTable could have been from
weeks ago, or it could be fired right at the same time as the KStream
event).

Could I use JoinWindows.before() and pass in an arbitrarily long interval?


Re: KStream-KTable join with the KTable given a "head start"

2016-03-30 Thread Guozhang Wang
Hi Jeff,

This is a common case of stream-table join, in that the joining results
depending on the arrival ordering from these two sources.

In Kafka Streams you can try to "synchronize" multiple input streams
through the "TimestampExtractor" interface, which is used to assign a
timestamp to each record polled from Kafka to start the processing. You
can, for example, set the timestamps for your KStream within a later time
interval and the timestamps for your KTable stream with an earlier time
interval, so that the records from table are likely to be processed first.
Note that this is an best effort, in that we cannot guarantee global
ordering across streams while processing, that if you have a much later
record coming from KTable then it will not block earlier records from
KStream from being processed first. But we think this mechanism should be
sufficient in practice.

Let me know if it fits with your scenario, and if not we can talk about how
it can be possibly improved.

Guozhang

On Wed, Mar 30, 2016 at 8:14 AM, Jeff Klukas  wrote:

> I have a KStream that I want to enrich with some values from a lookup
> table. When a new key enters the KStream, there's likely to be a
> corresponding entry arriving on the KStream at the same time, so we end up
> with a race condition. If the KTable record arrives first, then its value
> is available for joining when the corresponding arrives on the KStream.  If
> the KStream record arrives first, however, we'll get a null join even if
> the KTable gets the corresponding record only milliseconds later.
>
> I'd like to give the KTable a "head start" of ~10 seconds, so that it gets
> a chance to get updated before the corresponding records arrive on the
> KStream. Could I achieve this using one of the existing Windowing
> strategies?
>



-- 
-- Guozhang


KStream-KTable join with the KTable given a "head start"

2016-03-30 Thread Jeff Klukas
I have a KStream that I want to enrich with some values from a lookup
table. When a new key enters the KStream, there's likely to be a
corresponding entry arriving on the KStream at the same time, so we end up
with a race condition. If the KTable record arrives first, then its value
is available for joining when the corresponding arrives on the KStream.  If
the KStream record arrives first, however, we'll get a null join even if
the KTable gets the corresponding record only milliseconds later.

I'd like to give the KTable a "head start" of ~10 seconds, so that it gets
a chance to get updated before the corresponding records arrive on the
KStream. Could I achieve this using one of the existing Windowing
strategies?