Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread kant kodali
I see! I get the logic now!

On Tue, Feb 27, 2018 at 5:55 PM, naresh Goud <nareshgoud.du...@gmail.com>
wrote:

> Hi Kant,
>
> TD's explanation makes a lot sense. Refer this stackoverflow, where its
> was explained with program output.  Hope this helps.
>
> https://stackoverflow.com/questions/45579100/structured-
> streaming-watermark-vs-exactly-once-semantics
>
>
>
>
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>
> On Tue, Feb 27, 2018 at 7:45 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Let me answer the original question directly, that is, how do we
>> determine that an event is late. We simply track the maximum event time the
>> engine has seen in the data it has processed till now. And any data that
>> has event time less than the max is basically "late" (as it is
>> out-of-order). Now, in a distributed setting, it is very hard define to
>> whether each record is late or not, because it is hard to have a consistent
>> definition of max-event-time-seen. Fortunately, we dont have to do this
>> precisely because we dont really care whether a record is "late"; we only
>> care whether a record is "too late", that is, older than the watermark =
>> max-event-time-seen - watermark-delay). As the programming guide says, if
>> data is "late" but not "too late" we process it in the same way as non-late
>> data. Only when the data is "too late" do we drop it.
>>
>> To further clarify, we do not in any way to correlate processing-time
>> with event-time. The definition of lateness is only based on event-time and
>> has nothing to do with processing-time. This allows us to do event-time
>> processing with old data streams as well. For example, you may replay
>> 1-week old data as a stream, and the processing will be exactly the same as
>> it would have been if you had processed the stream in real-time a week ago.
>> This is fundamentally necessary for achieving the deterministic processing
>> that Structured Streaming guarantees.
>>
>> Regarding the picture, the "time" is actually "event-time". My apologies
>> for not making this clear in the picture. In hindsight, the picture can be
>> made much better.  :)
>>
>> Hope this explanation helps!
>>
>> TD
>>
>> On Tue, Feb 27, 2018 at 2:26 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> I read through the spark structured streaming documentation and I wonder
>>> how does spark structured streaming determine an event has arrived late?
>>> Does it compare the event-time with the processing time?
>>>
>>> [image: enter image description here]
>>> <https://i.stack.imgur.com/CXH4i.png>
>>>
>>> Taking the above picture as an example Is the bold right arrow line
>>> "Time" represent processing time? If so
>>>
>>> 1) where does this processing time come from? since its streaming Is it
>>> assuming someone is likely using an upstream source that has processing
>>> timestamp in it or spark adds a processing timestamp field? For example,
>>> when reading messages from Kafka we do something like
>>>
>>> Dataset kafkadf = spark.readStream().forma("kafka").load()
>>>
>>> This dataframe has timestamp column by default which I am assuming is
>>> the processing time. correct? If so, Does Kafka or Spark add this timestamp?
>>>
>>> 2) I can see there is a time comparison between bold right arrow line
>>> and time in the message. And is that how spark determines an event is late?
>>>
>>
>>
>


Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread naresh Goud
Hi Kant,

TD's explanation makes a lot sense. Refer this stackoverflow, where its was
explained with program output.  Hope this helps.

https://stackoverflow.com/questions/45579100/structured-streaming-watermark-vs-exactly-once-semantics




Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


On Tue, Feb 27, 2018 at 7:45 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Let me answer the original question directly, that is, how do we determine
> that an event is late. We simply track the maximum event time the engine
> has seen in the data it has processed till now. And any data that has event
> time less than the max is basically "late" (as it is out-of-order). Now, in
> a distributed setting, it is very hard define to whether each record is
> late or not, because it is hard to have a consistent definition of
> max-event-time-seen. Fortunately, we dont have to do this precisely because
> we dont really care whether a record is "late"; we only care whether a
> record is "too late", that is, older than the watermark =
> max-event-time-seen - watermark-delay). As the programming guide says, if
> data is "late" but not "too late" we process it in the same way as non-late
> data. Only when the data is "too late" do we drop it.
>
> To further clarify, we do not in any way to correlate processing-time with
> event-time. The definition of lateness is only based on event-time and has
> nothing to do with processing-time. This allows us to do event-time
> processing with old data streams as well. For example, you may replay
> 1-week old data as a stream, and the processing will be exactly the same as
> it would have been if you had processed the stream in real-time a week ago.
> This is fundamentally necessary for achieving the deterministic processing
> that Structured Streaming guarantees.
>
> Regarding the picture, the "time" is actually "event-time". My apologies
> for not making this clear in the picture. In hindsight, the picture can be
> made much better.  :)
>
> Hope this explanation helps!
>
> TD
>
> On Tue, Feb 27, 2018 at 2:26 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> I read through the spark structured streaming documentation and I wonder
>> how does spark structured streaming determine an event has arrived late?
>> Does it compare the event-time with the processing time?
>>
>> [image: enter image description here]
>> <https://i.stack.imgur.com/CXH4i.png>
>>
>> Taking the above picture as an example Is the bold right arrow line
>> "Time" represent processing time? If so
>>
>> 1) where does this processing time come from? since its streaming Is it
>> assuming someone is likely using an upstream source that has processing
>> timestamp in it or spark adds a processing timestamp field? For example,
>> when reading messages from Kafka we do something like
>>
>> Dataset kafkadf = spark.readStream().forma("kafka").load()
>>
>> This dataframe has timestamp column by default which I am assuming is the
>> processing time. correct? If so, Does Kafka or Spark add this timestamp?
>>
>> 2) I can see there is a time comparison between bold right arrow line and
>> time in the message. And is that how spark determines an event is late?
>>
>
>


Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread Tathagata Das
Let me answer the original question directly, that is, how do we determine
that an event is late. We simply track the maximum event time the engine
has seen in the data it has processed till now. And any data that has event
time less than the max is basically "late" (as it is out-of-order). Now, in
a distributed setting, it is very hard define to whether each record is
late or not, because it is hard to have a consistent definition of
max-event-time-seen. Fortunately, we dont have to do this precisely because
we dont really care whether a record is "late"; we only care whether a
record is "too late", that is, older than the watermark =
max-event-time-seen - watermark-delay). As the programming guide says, if
data is "late" but not "too late" we process it in the same way as non-late
data. Only when the data is "too late" do we drop it.

To further clarify, we do not in any way to correlate processing-time with
event-time. The definition of lateness is only based on event-time and has
nothing to do with processing-time. This allows us to do event-time
processing with old data streams as well. For example, you may replay
1-week old data as a stream, and the processing will be exactly the same as
it would have been if you had processed the stream in real-time a week ago.
This is fundamentally necessary for achieving the deterministic processing
that Structured Streaming guarantees.

Regarding the picture, the "time" is actually "event-time". My apologies
for not making this clear in the picture. In hindsight, the picture can be
made much better.  :)

Hope this explanation helps!

TD

On Tue, Feb 27, 2018 at 2:26 AM, kant kodali <kanth...@gmail.com> wrote:

> I read through the spark structured streaming documentation and I wonder
> how does spark structured streaming determine an event has arrived late?
> Does it compare the event-time with the processing time?
>
> [image: enter image description here]
> <https://i.stack.imgur.com/CXH4i.png>
>
> Taking the above picture as an example Is the bold right arrow line "Time"
> represent processing time? If so
>
> 1) where does this processing time come from? since its streaming Is it
> assuming someone is likely using an upstream source that has processing
> timestamp in it or spark adds a processing timestamp field? For example,
> when reading messages from Kafka we do something like
>
> Dataset kafkadf = spark.readStream().forma("kafka").load()
>
> This dataframe has timestamp column by default which I am assuming is the
> processing time. correct? If so, Does Kafka or Spark add this timestamp?
>
> 2) I can see there is a time comparison between bold right arrow line and
> time in the message. And is that how spark determines an event is late?
>


How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread kant kodali
I read through the spark structured streaming documentation and I wonder
how does spark structured streaming determine an event has arrived late?
Does it compare the event-time with the processing time?

[image: enter image description here] <https://i.stack.imgur.com/CXH4i.png>

Taking the above picture as an example Is the bold right arrow line "Time"
represent processing time? If so

1) where does this processing time come from? since its streaming Is it
assuming someone is likely using an upstream source that has processing
timestamp in it or spark adds a processing timestamp field? For example,
when reading messages from Kafka we do something like

Dataset kafkadf = spark.readStream().forma("kafka").load()

This dataframe has timestamp column by default which I am assuming is the
processing time. correct? If so, Does Kafka or Spark add this timestamp?

2) I can see there is a time comparison between bold right arrow line and
time in the message. And is that how spark determines an event is late?