Hi Reuven,
Thank you for your response.

Yes, I've tested session windows with a gap of 10 minutes as I thought this
should work in this scenario.
However, I found that I still had articles/assets where the watermark might
have been wrong. As I am relying on the assets being processed first
(inserted into BigTable, followed by a fetch from BigTable for the whole
history) I tried the following workaround that works (ran over the weekend
for 72 hours of testing):

As I'm reading from KafkaIO I am using a custom
.withTimestampPolicyFactory(withEventTs) for the assets, in which I am
simply setting a timestamp that is 1 minute earlier as the element's event
timestamp (this is my allowed gap).
The rest of the pipeline stays as-is, so the operation and logical overhead
is kept at a minimum.

On Fri, Oct 2, 2020 at 9:40 PM Reuven Lax <re...@google.com> wrote:

> Have you considered using Session windows? The window would start at the
> timestamp of the article, and the Session gap duration would be the
> (event-time) timeout after which you stop waiting for assets to join that
> article.
>
> On Fri, Oct 2, 2020 at 3:05 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
> wrote:
>
>> Hello,
>>
>> In chapter 4 of the Streaming Systems book by Tyler, Slava and Reuven
>> there is an example 4-6 on page 111 about custom windowing that deals with
>> UnalignedFixedWindows:
>>
>> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html
>>
>> Unfortunately that example is abbreviated and the full source code is not
>> published in this repo:
>> https://github.com/takidau/streamingbook
>>
>> I am joining two Kafka Streams and I am currently windowing them by fixed
>> time intervals. However the elements in stream one ("articles") are
>> published first, then the assets for those articles are being published in
>> the "assets" topic. Articles event timestamps are therefore slightly before
>> those of assets.
>>
>> Now when doing a CoGroupByKey this can lead to a situation where an
>> article is not being processed together with its assets, as
>>
>> - the article has a timestamp of 2020-10-02T00:30:29.997Z
>> - the assets have a timestamp of 2020-10-02T00:30:30.001Z
>>
>> This is a must in my pipeline as I am relying on them to be processed
>> together - otherwise I am publishing an article without it's assets.
>>
>> My idea was therefore to apply UnalignedFixedWindows instead of fixed
>> ones to the streams to circumvent this. What I am currently missing is the
>> mergeWindows() implementation or the full source code to understand it.
>> I am currently facing a java.lang.IllegalStateException
>>
>> TimestampCombiner moved element from 2020-10-02T09:32:36.079Z to earlier
>> time 2020-10-02T09:32:03.365Z for window
>> [2020-10-02T09:31:03.366Z..2020-10-02T09:32:03.366Z)
>>
>> Which gives me the impression that I am doing something wrong or have not
>> fully understood the custom windowing topic.
>>
>> Am I on the wrong track here?
>>
>>
>>
>>

Reply via email to