let me ask this another way: if i run this program and then feed it a single value (on nc), it returns a single result, which is an empty batch. it will not return anything else after that, no matter how long i wait.
this only happens with watermarking and append output mode. what do i do to correct this behavior? On Mon, May 28, 2018 at 6:16 PM, Koert Kuipers <ko...@tresata.com> wrote: > hello all, > just playing with structured streaming aggregations for the first time. > this is my little program i run inside sbt: > > import org.apache.spark.sql.functions._ > > val lines = spark.readStream > .format("socket") > .option("host", "localhost") > .option("port", 9999) > .load() > > val query = lines > .withColumn("time", current_timestamp) > .withWatermark("time", "1 second") > .groupBy(window($"time", "1 second")).agg(collect_list("value") as > "value") > .withColumn("windowstring", $"window" as "string") > .writeStream > .format("console") > .outputMode(OutputMode.Append) > .start() > > query.awaitTermination() > > before i start it i create a little server with nc: > $ nc -lk 9999 > > after it starts i simply type in a single character every 20 seconds or so > inside nc and hit enter. my characters are 1, 2, 3, etc. > > the thing i dont understand is it comes back with the correct responses, > but with delays in terms of entries (not time). after the first 2 > characters it comes back with empty aggregations, and then for every next > character it comes back with the response for 2 characters ago. so when i > hit 3<enter> it comes back with the response for 1<enter>. > > not very realtime :( > > any idea why? > > i would like it to respond to my input 1<enter> with the relevant response > for that input (after the window and watermark has expired, of course, so > within 2 seconds). > > i tried adding a trigger of 1 second but that didnt help either. > > below is the output with my inputs inserted using '<= <input>', so '<= 1' > means i hit 1 and then enter. > > > <= 1 > ------------------------------------------- > Batch: 0 > ------------------------------------------- > +------+-----+------------+ > |window|value|windowstring| > +------+-----+------------+ > +------+-----+------------+ > > <= 2 > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +------+-----+------------+ > |window|value|windowstring| > +------+-----+------------+ > +------+-----+------------+ > > <= 3 > Batch: 2 > ------------------------------------------- > +--------------------+-----+--------------------+ > | window|value| windowstring| > +--------------------+-----+--------------------+ > |[2018-05-28 18:00...| [1]|[2018-05-28 18:00...| > +--------------------+-----+--------------------+ > > <= 4 > ------------------------------------------- > Batch: 3 > ------------------------------------------- > +--------------------+-----+--------------------+ > | window|value| windowstring| > +--------------------+-----+--------------------+ > |[2018-05-28 18:00...| [2]|[2018-05-28 18:00...| > +--------------------+-----+--------------------+ > > <= 5 > ------------------------------------------- > Batch: 4 > ------------------------------------------- > +--------------------+-----+--------------------+ > | window|value| windowstring| > +--------------------+-----+--------------------+ > |[2018-05-28 18:01...| [3]|[2018-05-28 18:01...| > +--------------------+-----+--------------------+ > > >