hello all, just playing with structured streaming aggregations for the first time. this is my little program i run inside sbt:
import org.apache.spark.sql.functions._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() val query = lines .withColumn("time", current_timestamp) .withWatermark("time", "1 second") .groupBy(window($"time", "1 second")).agg(collect_list("value") as "value") .withColumn("windowstring", $"window" as "string") .writeStream .format("console") .outputMode(OutputMode.Append) .start() query.awaitTermination() before i start it i create a little server with nc: $ nc -lk 9999 after it starts i simply type in a single character every 20 seconds or so inside nc and hit enter. my characters are 1, 2, 3, etc. the thing i dont understand is it comes back with the correct responses, but with delays in terms of entries (not time). after the first 2 characters it comes back with empty aggregations, and then for every next character it comes back with the response for 2 characters ago. so when i hit 3<enter> it comes back with the response for 1<enter>. not very realtime :( any idea why? i would like it to respond to my input 1<enter> with the relevant response for that input (after the window and watermark has expired, of course, so within 2 seconds). i tried adding a trigger of 1 second but that didnt help either. below is the output with my inputs inserted using '<= <input>', so '<= 1' means i hit 1 and then enter. <= 1 ------------------------------------------- Batch: 0 ------------------------------------------- +------+-----+------------+ |window|value|windowstring| +------+-----+------------+ +------+-----+------------+ <= 2 ------------------------------------------- Batch: 1 ------------------------------------------- +------+-----+------------+ |window|value|windowstring| +------+-----+------------+ +------+-----+------------+ <= 3 Batch: 2 ------------------------------------------- +--------------------+-----+--------------------+ | window|value| windowstring| +--------------------+-----+--------------------+ |[2018-05-28 18:00...| [1]|[2018-05-28 18:00...| +--------------------+-----+--------------------+ <= 4 ------------------------------------------- Batch: 3 ------------------------------------------- +--------------------+-----+--------------------+ | window|value| windowstring| +--------------------+-----+--------------------+ |[2018-05-28 18:00...| [2]|[2018-05-28 18:00...| +--------------------+-----+--------------------+ <= 5 ------------------------------------------- Batch: 4 ------------------------------------------- +--------------------+-----+--------------------+ | window|value| windowstring| +--------------------+-----+--------------------+ |[2018-05-28 18:01...| [3]|[2018-05-28 18:01...| +--------------------+-----+--------------------+