Hi Kant, TD's explanation makes a lot sense. Refer this stackoverflow, where its was explained with program output. Hope this helps.
https://stackoverflow.com/questions/45579100/structured-streaming-watermark-vs-exactly-once-semantics Thanks, Naresh www.linkedin.com/in/naresh-dulam http://hadoopandspark.blogspot.com/ On Tue, Feb 27, 2018 at 7:45 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Let me answer the original question directly, that is, how do we determine > that an event is late. We simply track the maximum event time the engine > has seen in the data it has processed till now. And any data that has event > time less than the max is basically "late" (as it is out-of-order). Now, in > a distributed setting, it is very hard define to whether each record is > late or not, because it is hard to have a consistent definition of > max-event-time-seen. Fortunately, we dont have to do this precisely because > we dont really care whether a record is "late"; we only care whether a > record is "too late", that is, older than the watermark = > max-event-time-seen - watermark-delay). As the programming guide says, if > data is "late" but not "too late" we process it in the same way as non-late > data. Only when the data is "too late" do we drop it. > > To further clarify, we do not in any way to correlate processing-time with > event-time. The definition of lateness is only based on event-time and has > nothing to do with processing-time. This allows us to do event-time > processing with old data streams as well. For example, you may replay > 1-week old data as a stream, and the processing will be exactly the same as > it would have been if you had processed the stream in real-time a week ago. > This is fundamentally necessary for achieving the deterministic processing > that Structured Streaming guarantees. > > Regarding the picture, the "time" is actually "event-time". My apologies > for not making this clear in the picture. In hindsight, the picture can be > made much better. :) > > Hope this explanation helps! > > TD > > On Tue, Feb 27, 2018 at 2:26 AM, kant kodali <kanth...@gmail.com> wrote: > >> I read through the spark structured streaming documentation and I wonder >> how does spark structured streaming determine an event has arrived late? >> Does it compare the event-time with the processing time? >> >> [image: enter image description here] >> <https://i.stack.imgur.com/CXH4i.png> >> >> Taking the above picture as an example Is the bold right arrow line >> "Time" represent processing time? If so >> >> 1) where does this processing time come from? since its streaming Is it >> assuming someone is likely using an upstream source that has processing >> timestamp in it or spark adds a processing timestamp field? For example, >> when reading messages from Kafka we do something like >> >> Dataset<Row> kafkadf = spark.readStream().forma("kafka").load() >> >> This dataframe has timestamp column by default which I am assuming is the >> processing time. correct? If so, Does Kafka or Spark add this timestamp? >> >> 2) I can see there is a time comparison between bold right arrow line and >> time in the message. And is that how spark determines an event is late? >> > >