Let me answer the original question directly, that is, how do we determine that an event is late. We simply track the maximum event time the engine has seen in the data it has processed till now. And any data that has event time less than the max is basically "late" (as it is out-of-order). Now, in a distributed setting, it is very hard define to whether each record is late or not, because it is hard to have a consistent definition of max-event-time-seen. Fortunately, we dont have to do this precisely because we dont really care whether a record is "late"; we only care whether a record is "too late", that is, older than the watermark = max-event-time-seen - watermark-delay). As the programming guide says, if data is "late" but not "too late" we process it in the same way as non-late data. Only when the data is "too late" do we drop it.
To further clarify, we do not in any way to correlate processing-time with event-time. The definition of lateness is only based on event-time and has nothing to do with processing-time. This allows us to do event-time processing with old data streams as well. For example, you may replay 1-week old data as a stream, and the processing will be exactly the same as it would have been if you had processed the stream in real-time a week ago. This is fundamentally necessary for achieving the deterministic processing that Structured Streaming guarantees. Regarding the picture, the "time" is actually "event-time". My apologies for not making this clear in the picture. In hindsight, the picture can be made much better. :) Hope this explanation helps! TD On Tue, Feb 27, 2018 at 2:26 AM, kant kodali <kanth...@gmail.com> wrote: > I read through the spark structured streaming documentation and I wonder > how does spark structured streaming determine an event has arrived late? > Does it compare the event-time with the processing time? > > [image: enter image description here] > <https://i.stack.imgur.com/CXH4i.png> > > Taking the above picture as an example Is the bold right arrow line "Time" > represent processing time? If so > > 1) where does this processing time come from? since its streaming Is it > assuming someone is likely using an upstream source that has processing > timestamp in it or spark adds a processing timestamp field? For example, > when reading messages from Kafka we do something like > > Dataset<Row> kafkadf = spark.readStream().forma("kafka").load() > > This dataframe has timestamp column by default which I am assuming is the > processing time. correct? If so, Does Kafka or Spark add this timestamp? > > 2) I can see there is a time comparison between bold right arrow line and > time in the message. And is that how spark determines an event is late? >