let me ask this another way: if i run this program and then feed it a
single value (on nc), it returns a single result, which is an empty batch.
it will not return anything else after that, no matter how long i wait.

this only happens with watermarking and append output mode.

what do i do to correct this behavior?


On Mon, May 28, 2018 at 6:16 PM, Koert Kuipers <ko...@tresata.com> wrote:

> hello all,
> just playing with structured streaming aggregations for the first time.
> this is my little program i run inside sbt:
>
>     import org.apache.spark.sql.functions._
>
>     val lines = spark.readStream
>       .format("socket")
>       .option("host", "localhost")
>       .option("port", 9999)
>       .load()
>
>     val query = lines
>       .withColumn("time", current_timestamp)
>       .withWatermark("time", "1 second")
>       .groupBy(window($"time", "1 second")).agg(collect_list("value") as
> "value")
>       .withColumn("windowstring", $"window" as "string")
>       .writeStream
>       .format("console")
>       .outputMode(OutputMode.Append)
>       .start()
>
>     query.awaitTermination()
>
> before i start it i create a little server with nc:
> $ nc -lk 9999
>
> after it starts i simply type in a single character every 20 seconds or so
> inside nc and hit enter. my characters are 1, 2, 3, etc.
>
> the thing i dont understand is it comes back with the correct responses,
> but with delays in terms of entries (not time). after the first 2
> characters it comes back with empty aggregations, and then for every next
> character it comes back with the response for 2 characters ago. so when i
> hit 3<enter> it comes back with the response for 1<enter>.
>
> not very realtime :(
>
> any idea why?
>
> i would like it to respond to my input 1<enter> with the relevant response
> for that input (after the window and watermark has expired, of course, so
> within 2 seconds).
>
> i tried adding a trigger of 1 second but that didnt help either.
>
> below is the output with my inputs inserted using '<= <input>', so '<= 1'
> means i hit 1 and then enter.
>
>
> <= 1
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +------+-----+------------+
> |window|value|windowstring|
> +------+-----+------------+
> +------+-----+------------+
>
> <= 2
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +------+-----+------------+
> |window|value|windowstring|
> +------+-----+------------+
> +------+-----+------------+
>
> <= 3
> Batch: 2
> -------------------------------------------
> +--------------------+-----+--------------------+
> |              window|value|        windowstring|
> +--------------------+-----+--------------------+
> |[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
> +--------------------+-----+--------------------+
>
> <= 4
> -------------------------------------------
> Batch: 3
> -------------------------------------------
> +--------------------+-----+--------------------+
> |              window|value|        windowstring|
> +--------------------+-----+--------------------+
> |[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
> +--------------------+-----+--------------------+
>
> <= 5
> -------------------------------------------
> Batch: 4
> -------------------------------------------
> +--------------------+-----+--------------------+
> |              window|value|        windowstring|
> +--------------------+-----+--------------------+
> |[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
> +--------------------+-----+--------------------+
>
>
>

Reply via email to