Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-30 Thread Koert Kuipers
thanks, thats helpful.


On Wed, May 30, 2018 at 5:05 PM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Few things
>
>
>
>1. Append mode is going to output data that falls out of the watermark
>2. Structured streaming isn’t time based. It reacts only when it sees
>input data. If no data appears in the input it will not move the
>aggregation window
>3. Clock time is irrelevant to structured streaming. As far as
>structured streaming is concerned, “current time” is max time of timestamp
>column used by the window
>
>
>
> SO, what is happening in our case is that you posted 1 and 2 within 2
> seconds of each other. Since, none of them fell outside of watermark, it
> didn’t output anything. Now, until the point you posted 3, time was frozen
> for Structured streaming. The max time of the timestamp column was the
> timestamp of message 2. So, current time was the timestamp of message 2.
> When you posted 3, the time advanced to the timestamp of 3, which caused 1
> to fall out, so it output 1.
>
>
>
> Note that, it will not output 1 exactly 1 second after 1 arrives. The
> clock time means nothing.
>
>
>
> *From: *Koert Kuipers 
> *Date: *Monday, May 28, 2018 at 6:17 PM
> *To: *user 
> *Subject: *trying to understand structured streaming aggregation with
> watermark and append outputmode
>
>
>
> hello all,
>
> just playing with structured streaming aggregations for the first time.
> this is my little program i run inside sbt:
>
>
>
> import org.apache.spark.sql.functions._
>
> val lines = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", )
>   .load()
>
> val query = lines
>   .withColumn("time", current_timestamp)
>   .withWatermark("time", "1 second")
>   .groupBy(window($"time", "1 second")).agg(collect_list("value") as
> "value")
>   .withColumn("windowstring", $"window" as "string")
>   .writeStream
>   .format("console")
>   .outputMode(OutputMode.Append)
>   .start()
>
> query.awaitTermination()
>
>
>
> before i start it i create a little server with nc:
>
> $ nc -lk 
>
>
>
> after it starts i simply type in a single character every 20 seconds or so
> inside nc and hit enter. my characters are 1, 2, 3, etc.
>
>
>
> the thing i dont understand is it comes back with the correct responses,
> but with delays in terms of entries (not time). after the first 2
> characters it comes back with empty aggregations, and then for every next
> character it comes back with the response for 2 characters ago. so when i
> hit 3 it comes back with the response for 1.
>
>
>
> not very realtime :(
>
>
>
> any idea why?
>
>
>
> i would like it to respond to my input 1 with the relevant response
> for that input (after the window and watermark has expired, of course, so
> within 2 seconds).
>
>
>
> i tried adding a trigger of 1 second but that didnt help either.
>
>
>
> below is the output with my inputs inserted using '<= ', so '<= 1'
> means i hit 1 and then enter.
>
>
>
>
>
> <= 1
> ---
> Batch: 0
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 2
> ---
> Batch: 1
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 3
> Batch: 2
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
> ++-++
>
> <= 4
> ---
> Batch: 3
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
> ++-++
>
> <= 5
> ---
> Batch: 4
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
> ++-++
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the 

Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-30 Thread Lalwani, Jayesh
Few things


  1.  Append mode is going to output data that falls out of the watermark
  2.  Structured streaming isn’t time based. It reacts only when it sees input 
data. If no data appears in the input it will not move the aggregation window
  3.  Clock time is irrelevant to structured streaming. As far as structured 
streaming is concerned, “current time” is max time of timestamp column used by 
the window

SO, what is happening in our case is that you posted 1 and 2 within 2 seconds 
of each other. Since, none of them fell outside of watermark, it didn’t output 
anything. Now, until the point you posted 3, time was frozen for Structured 
streaming. The max time of the timestamp column was the timestamp of message 2. 
So, current time was the timestamp of message 2. When you posted 3, the time 
advanced to the timestamp of 3, which caused 1 to fall out, so it output 1.

Note that, it will not output 1 exactly 1 second after 1 arrives. The clock 
time means nothing.

From: Koert Kuipers 
Date: Monday, May 28, 2018 at 6:17 PM
To: user 
Subject: trying to understand structured streaming aggregation with watermark 
and append outputmode

hello all,
just playing with structured streaming aggregations for the first time. this is 
my little program i run inside sbt:

import org.apache.spark.sql.functions._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val query = lines
  .withColumn("time", current_timestamp)
  .withWatermark("time", "1 second")
  .groupBy(window($"time", "1 second")).agg(collect_list("value") as 
"value")
  .withColumn("windowstring", $"window" as "string")
  .writeStream
  .format("console")
  .outputMode(OutputMode.Append)
  .start()

query.awaitTermination()

before i start it i create a little server with nc:
$ nc -lk 

after it starts i simply type in a single character every 20 seconds or so 
inside nc and hit enter. my characters are 1, 2, 3, etc.

the thing i dont understand is it comes back with the correct responses, but 
with delays in terms of entries (not time). after the first 2 characters it 
comes back with empty aggregations, and then for every next character it comes 
back with the response for 2 characters ago. so when i hit 3 it comes 
back with the response for 1.

not very realtime :(

any idea why?

i would like it to respond to my input 1 with the relevant response for 
that input (after the window and watermark has expired, of course, so within 2 
seconds).

i tried adding a trigger of 1 second but that didnt help either.

below is the output with my inputs inserted using '<= ', so '<= 1' means 
i hit 1 and then enter.


<= 1
---
Batch: 0
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 2
---
Batch: 1
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 3
Batch: 2
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
++-++

<= 4
---
Batch: 3
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
++-++

<= 5
---
Batch: 4
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
++-++



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-29 Thread Koert Kuipers
let me ask this another way: if i run this program and then feed it a
single value (on nc), it returns a single result, which is an empty batch.
it will not return anything else after that, no matter how long i wait.

this only happens with watermarking and append output mode.

what do i do to correct this behavior?


On Mon, May 28, 2018 at 6:16 PM, Koert Kuipers  wrote:

> hello all,
> just playing with structured streaming aggregations for the first time.
> this is my little program i run inside sbt:
>
> import org.apache.spark.sql.functions._
>
> val lines = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", )
>   .load()
>
> val query = lines
>   .withColumn("time", current_timestamp)
>   .withWatermark("time", "1 second")
>   .groupBy(window($"time", "1 second")).agg(collect_list("value") as
> "value")
>   .withColumn("windowstring", $"window" as "string")
>   .writeStream
>   .format("console")
>   .outputMode(OutputMode.Append)
>   .start()
>
> query.awaitTermination()
>
> before i start it i create a little server with nc:
> $ nc -lk 
>
> after it starts i simply type in a single character every 20 seconds or so
> inside nc and hit enter. my characters are 1, 2, 3, etc.
>
> the thing i dont understand is it comes back with the correct responses,
> but with delays in terms of entries (not time). after the first 2
> characters it comes back with empty aggregations, and then for every next
> character it comes back with the response for 2 characters ago. so when i
> hit 3 it comes back with the response for 1.
>
> not very realtime :(
>
> any idea why?
>
> i would like it to respond to my input 1 with the relevant response
> for that input (after the window and watermark has expired, of course, so
> within 2 seconds).
>
> i tried adding a trigger of 1 second but that didnt help either.
>
> below is the output with my inputs inserted using '<= ', so '<= 1'
> means i hit 1 and then enter.
>
>
> <= 1
> ---
> Batch: 0
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 2
> ---
> Batch: 1
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 3
> Batch: 2
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
> ++-++
>
> <= 4
> ---
> Batch: 3
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
> ++-++
>
> <= 5
> ---
> Batch: 4
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
> ++-++
>
>
>


trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-28 Thread Koert Kuipers
hello all,
just playing with structured streaming aggregations for the first time.
this is my little program i run inside sbt:

import org.apache.spark.sql.functions._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val query = lines
  .withColumn("time", current_timestamp)
  .withWatermark("time", "1 second")
  .groupBy(window($"time", "1 second")).agg(collect_list("value") as
"value")
  .withColumn("windowstring", $"window" as "string")
  .writeStream
  .format("console")
  .outputMode(OutputMode.Append)
  .start()

query.awaitTermination()

before i start it i create a little server with nc:
$ nc -lk 

after it starts i simply type in a single character every 20 seconds or so
inside nc and hit enter. my characters are 1, 2, 3, etc.

the thing i dont understand is it comes back with the correct responses,
but with delays in terms of entries (not time). after the first 2
characters it comes back with empty aggregations, and then for every next
character it comes back with the response for 2 characters ago. so when i
hit 3 it comes back with the response for 1.

not very realtime :(

any idea why?

i would like it to respond to my input 1 with the relevant response
for that input (after the window and watermark has expired, of course, so
within 2 seconds).

i tried adding a trigger of 1 second but that didnt help either.

below is the output with my inputs inserted using '<= ', so '<= 1'
means i hit 1 and then enter.


<= 1
---
Batch: 0
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 2
---
Batch: 1
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 3
Batch: 2
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
++-++

<= 4
---
Batch: 3
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
++-++

<= 5
---
Batch: 4
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
++-++