Let me answer the original question directly, that is, how do we determine
that an event is late. We simply track the maximum event time the engine
has seen in the data it has processed till now. And any data that has event
time less than the max is basically "late" (as it is out-of-order). Now, in
a distributed setting, it is very hard define to whether each record is
late or not, because it is hard to have a consistent definition of
max-event-time-seen. Fortunately, we dont have to do this precisely because
we dont really care whether a record is "late"; we only care whether a
record is "too late", that is, older than the watermark =
max-event-time-seen - watermark-delay). As the programming guide says, if
data is "late" but not "too late" we process it in the same way as non-late
data. Only when the data is "too late" do we drop it.

To further clarify, we do not in any way to correlate processing-time with
event-time. The definition of lateness is only based on event-time and has
nothing to do with processing-time. This allows us to do event-time
processing with old data streams as well. For example, you may replay
1-week old data as a stream, and the processing will be exactly the same as
it would have been if you had processed the stream in real-time a week ago.
This is fundamentally necessary for achieving the deterministic processing
that Structured Streaming guarantees.

Regarding the picture, the "time" is actually "event-time". My apologies
for not making this clear in the picture. In hindsight, the picture can be
made much better.  :)

Hope this explanation helps!

TD

On Tue, Feb 27, 2018 at 2:26 AM, kant kodali <kanth...@gmail.com> wrote:

> I read through the spark structured streaming documentation and I wonder
> how does spark structured streaming determine an event has arrived late?
> Does it compare the event-time with the processing time?
>
> [image: enter image description here]
> <https://i.stack.imgur.com/CXH4i.png>
>
> Taking the above picture as an example Is the bold right arrow line "Time"
> represent processing time? If so
>
> 1) where does this processing time come from? since its streaming Is it
> assuming someone is likely using an upstream source that has processing
> timestamp in it or spark adds a processing timestamp field? For example,
> when reading messages from Kafka we do something like
>
> Dataset<Row> kafkadf = spark.readStream().forma("kafka").load()
>
> This dataframe has timestamp column by default which I am assuming is the
> processing time. correct? If so, Does Kafka or Spark add this timestamp?
>
> 2) I can see there is a time comparison between bold right arrow line and
> time in the message. And is that how spark determines an event is late?
>

Reply via email to