thanks, thats helpful.

On Wed, May 30, 2018 at 5:05 PM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Few things
>
>
>
>    1. Append mode is going to output data that falls out of the watermark
>    2. Structured streaming isn’t time based. It reacts only when it sees
>    input data. If no data appears in the input it will not move the
>    aggregation window
>    3. Clock time is irrelevant to structured streaming. As far as
>    structured streaming is concerned, “current time” is max time of timestamp
>    column used by the window
>
>
>
> SO, what is happening in our case is that you posted 1 and 2 within 2
> seconds of each other. Since, none of them fell outside of watermark, it
> didn’t output anything. Now, until the point you posted 3, time was frozen
> for Structured streaming. The max time of the timestamp column was the
> timestamp of message 2. So, current time was the timestamp of message 2.
> When you posted 3, the time advanced to the timestamp of 3, which caused 1
> to fall out, so it output 1.
>
>
>
> Note that, it will not output 1 exactly 1 second after 1 arrives. The
> clock time means nothing.
>
>
>
> *From: *Koert Kuipers <ko...@tresata.com>
> *Date: *Monday, May 28, 2018 at 6:17 PM
> *To: *user <user@spark.apache.org>
> *Subject: *trying to understand structured streaming aggregation with
> watermark and append outputmode
>
>
>
> hello all,
>
> just playing with structured streaming aggregations for the first time.
> this is my little program i run inside sbt:
>
>
>
>     import org.apache.spark.sql.functions._
>
>     val lines = spark.readStream
>       .format("socket")
>       .option("host", "localhost")
>       .option("port", 9999)
>       .load()
>
>     val query = lines
>       .withColumn("time", current_timestamp)
>       .withWatermark("time", "1 second")
>       .groupBy(window($"time", "1 second")).agg(collect_list("value") as
> "value")
>       .withColumn("windowstring", $"window" as "string")
>       .writeStream
>       .format("console")
>       .outputMode(OutputMode.Append)
>       .start()
>
>     query.awaitTermination()
>
>
>
> before i start it i create a little server with nc:
>
> $ nc -lk 9999
>
>
>
> after it starts i simply type in a single character every 20 seconds or so
> inside nc and hit enter. my characters are 1, 2, 3, etc.
>
>
>
> the thing i dont understand is it comes back with the correct responses,
> but with delays in terms of entries (not time). after the first 2
> characters it comes back with empty aggregations, and then for every next
> character it comes back with the response for 2 characters ago. so when i
> hit 3<enter> it comes back with the response for 1<enter>.
>
>
>
> not very realtime :(
>
>
>
> any idea why?
>
>
>
> i would like it to respond to my input 1<enter> with the relevant response
> for that input (after the window and watermark has expired, of course, so
> within 2 seconds).
>
>
>
> i tried adding a trigger of 1 second but that didnt help either.
>
>
>
> below is the output with my inputs inserted using '<= <input>', so '<= 1'
> means i hit 1 and then enter.
>
>
>
>
>
> <= 1
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +------+-----+------------+
> |window|value|windowstring|
> +------+-----+------------+
> +------+-----+------------+
>
> <= 2
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +------+-----+------------+
> |window|value|windowstring|
> +------+-----+------------+
> +------+-----+------------+
>
> <= 3
> Batch: 2
> -------------------------------------------
> +--------------------+-----+--------------------+
> |              window|value|        windowstring|
> +--------------------+-----+--------------------+
> |[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
> +--------------------+-----+--------------------+
>
> <= 4
> -------------------------------------------
> Batch: 3
> -------------------------------------------
> +--------------------+-----+--------------------+
> |              window|value|        windowstring|
> +--------------------+-----+--------------------+
> |[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
> +--------------------+-----+--------------------+
>
> <= 5
> -------------------------------------------
> Batch: 4
> -------------------------------------------
> +--------------------+-----+--------------------+
> |              window|value|        windowstring|
> +--------------------+-----+--------------------+
> |[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
> +--------------------+-----+--------------------+
>
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Reply via email to