hello all,
just playing with structured streaming aggregations for the first time.
this is my little program i run inside sbt:

    import org.apache.spark.sql.functions._

    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    val query = lines
      .withColumn("time", current_timestamp)
      .withWatermark("time", "1 second")
      .groupBy(window($"time", "1 second")).agg(collect_list("value") as
"value")
      .withColumn("windowstring", $"window" as "string")
      .writeStream
      .format("console")
      .outputMode(OutputMode.Append)
      .start()

    query.awaitTermination()

before i start it i create a little server with nc:
$ nc -lk 9999

after it starts i simply type in a single character every 20 seconds or so
inside nc and hit enter. my characters are 1, 2, 3, etc.

the thing i dont understand is it comes back with the correct responses,
but with delays in terms of entries (not time). after the first 2
characters it comes back with empty aggregations, and then for every next
character it comes back with the response for 2 characters ago. so when i
hit 3<enter> it comes back with the response for 1<enter>.

not very realtime :(

any idea why?

i would like it to respond to my input 1<enter> with the relevant response
for that input (after the window and watermark has expired, of course, so
within 2 seconds).

i tried adding a trigger of 1 second but that didnt help either.

below is the output with my inputs inserted using '<= <input>', so '<= 1'
means i hit 1 and then enter.


<= 1
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+------------+
|window|value|windowstring|
+------+-----+------------+
+------+-----+------------+

<= 2
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+------------+
|window|value|windowstring|
+------+-----+------------+
+------+-----+------------+

<= 3
Batch: 2
-------------------------------------------
+--------------------+-----+--------------------+
|              window|value|        windowstring|
+--------------------+-----+--------------------+
|[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
+--------------------+-----+--------------------+

<= 4
-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+-----+--------------------+
|              window|value|        windowstring|
+--------------------+-----+--------------------+
|[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
+--------------------+-----+--------------------+

<= 5
-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------+-----+--------------------+
|              window|value|        windowstring|
+--------------------+-----+--------------------+
|[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
+--------------------+-----+--------------------+

Reply via email to