Hi Sachin,

I'd need more information to speculate about why your records are missing, but 
it sounds like you're suspecting something to do with the records' timestamps, 
so I'll just focus on answering your questions.

Streams always uses the same timestamp for all operations, which is the 
timestamp returned by the timestamp extractor. Whether this is event time or 
ingestion time is up to the timestamp extractor you're using.

If you're using the default timestamp extractor, then Streams will use the 
timestamp field on the ConsumerRecord that comes back from the broker. If 
you're using CreateTime, then it would hold the value of the timestamp written 
by the producer. If you're using LogAppendTime, then it's the timestamp 
representing when the broker actually adds the record to the topic.

One potential point of confusion is that when we say a "record", we mean more 
than just the key and value that you typically manipulate using the Streams 
DSL. In addition to these fields, there is a separate timestamp field, which is 
part of the Consumer/Producer/Broker protocols. That's what we use for time 
tracking, so you do not need to worry about embedding and extracting the 
timestamp in your values.

Streams will set the timestamp field on outgoing ProducerRecords it sends to 
the broker, so this would just be used by default for further stages in the 
pipeline. You don't need to add timestamp extractors further on.

The only usage of processing time (aka "wall-clock time") is in wall-clock 
based punctuation, if you're using the low-level Processor API. Also, the 
commit interval is defined in terms of wall-clock time. If all you're 
considering is the semantics of the Streams DSL, processing/wall-clock time 
would not play any part in those semantics.

I know that stream processing literature in general discusses event- vs. 
processing- vs. ingestion-time quite a bit, but for practical purposes, event 
time (either CreateTime or LogAppendTime) is the one that's useful for writing 
programs. Both ingestion time and processing time lead to non-deterministic 
programs with unclear semantics. That's why we pretty much stick to event time 
in the Streams DSL.

Finally, yeah, if you just want to process records in the same order they 
appear in the topics, then LogAppendTime might be better. 

I hope this helps clear things up a bit.

Thanks,
-John

On Fri, Dec 6, 2019, at 22:20, Sachin Mittal wrote:
> Hi,
> I have noticed some issues when doing stream to stream windowed joins.
> Looks like my joined stream does not include all the records.
> 
> Say I am doing join like this:
> stream1.join(
>             stream2,
>             (lv, rv) -> ...,
>             JoinWindows.of(Duration.ofMinutes(5)),
>            ....)
> What I have checked from the docs is that it will join 2 records within the
> specified window.
> However its not clear as what time it would take for each record?
> Would it be
> 1.  event-time or
> 2. processing-time or
> 3. ingestion-time
> 
> I am right now using default configuration for
> log.message.timestamp.type = CreateTime and default.timestamp.extractor
> 
> From the docs I gather is that in default case it uses event-time.
> So does it mean that there has to be a timestamp field in the record which
> is to be extracted by custom timestamp extractor?
> 
> Also in downstream when streams application actually writes (produces) new
> record types, do we need to provide timestamp extractor for all such record
> types
> so the next process in the pipeline can pick up the timestamp to do the
> windowed operations?
> 
> Also when and how processing time is used at all by streams application?
> 
> Finally say I don't want to worry about if timestamp is set by the
> producers, is it better to simply set
> log.message.timestamp.type =  LogAppendTime
> 
> Thanks
> Sachin
>

Reply via email to