Re: Time of derived records in Kafka Streams
Hi Elias, - out of order records: the timestamp is that of the out of order record, i.e., time goes backwards sometimes - joins: the same, the timestamp could be that of either record. We'll update the docs, thanks for your question. Eno > On 17 Sep 2016, at 00:43, Elias Levywrote: > > On Sat, Sep 10, 2016 at 9:17 AM, Eno Thereska > wrote: > >> >> For aggregations, the timestamp will be that of the latest record being >> aggregated. >> > > How does that account for out of order records? > > What about kstream-kstream joins? The output from the join could be > triggered by a record received from either stream depending on the order > they are received and processed. If the timestamp of the output is just > the timestamp of the latest received record, then it seems that the > timestamp could be that of either record. Although I suppose that the best > effort stream synchronization effort that Kafka Streams attempts means that > usually the timestamp will be that of the later record.
Re: Time of derived records in Kafka Streams
On Sat, Sep 10, 2016 at 9:17 AM, Eno Thereskawrote: > > For aggregations, the timestamp will be that of the latest record being > aggregated. > How does that account for out of order records? What about kstream-kstream joins? The output from the join could be triggered by a record received from either stream depending on the order they are received and processed. If the timestamp of the output is just the timestamp of the latest received record, then it seems that the timestamp could be that of either record. Although I suppose that the best effort stream synchronization effort that Kafka Streams attempts means that usually the timestamp will be that of the later record.
Re: Time of derived records in Kafka Streams
Hi Elias, Good question. The general answer is that each time a record is output, the timestamp is that of the current Kafka Streams task that processes it, so it's the internal Kafka Streams time. If the Kafka Streams task is processing records with event time, the timestamp at any point is the smallest among its input stream partition timestamps (see ProcessorContext.java, the timestamp() definition). This might sound complicated, but some examples should help: for output by stateless transformations the timestamp will be that of the record being transformed. For aggregations, the timestamp will be that of the latest record being aggregated. Cheers Eno > On 10 Sep 2016, at 00:19, Elias Levywrote: > > The Kafka Streams documentation discussed how to assign timestamps to > records received from source topic via TimestampExtractor. But neither the > Kafka nor the Confluent documentation on Kafka Streams explain what > timestamp is associated with a record that has been transformed. > > What timestamp is associated with records that are output by stateless > transformations like map or flatMap? > > What timestamp is associated with records that are outputted by stageful > transformations like aggregations or joins? > > What about transformations on windows? > > What timestamp does the Kafka publisher use, if any, when writing to an > intermediate topic via through() or a sink via to()?
Time of derived records in Kafka Streams
The Kafka Streams documentation discussed how to assign timestamps to records received from source topic via TimestampExtractor. But neither the Kafka nor the Confluent documentation on Kafka Streams explain what timestamp is associated with a record that has been transformed. What timestamp is associated with records that are output by stateless transformations like map or flatMap? What timestamp is associated with records that are outputted by stageful transformations like aggregations or joins? What about transformations on windows? What timestamp does the Kafka publisher use, if any, when writing to an intermediate topic via through() or a sink via to()?