thanks, thats helpful.
On Wed, May 30, 2018 at 5:05 PM, Lalwani, Jayesh < jayesh.lalw...@capitalone.com> wrote: > Few things > > > > 1. Append mode is going to output data that falls out of the watermark > 2. Structured streaming isn’t time based. It reacts only when it sees > input data. If no data appears in the input it will not move the > aggregation window > 3. Clock time is irrelevant to structured streaming. As far as > structured streaming is concerned, “current time” is max time of timestamp > column used by the window > > > > SO, what is happening in our case is that you posted 1 and 2 within 2 > seconds of each other. Since, none of them fell outside of watermark, it > didn’t output anything. Now, until the point you posted 3, time was frozen > for Structured streaming. The max time of the timestamp column was the > timestamp of message 2. So, current time was the timestamp of message 2. > When you posted 3, the time advanced to the timestamp of 3, which caused 1 > to fall out, so it output 1. > > > > Note that, it will not output 1 exactly 1 second after 1 arrives. The > clock time means nothing. > > > > *From: *Koert Kuipers <ko...@tresata.com> > *Date: *Monday, May 28, 2018 at 6:17 PM > *To: *user <user@spark.apache.org> > *Subject: *trying to understand structured streaming aggregation with > watermark and append outputmode > > > > hello all, > > just playing with structured streaming aggregations for the first time. > this is my little program i run inside sbt: > > > > import org.apache.spark.sql.functions._ > > val lines = spark.readStream > .format("socket") > .option("host", "localhost") > .option("port", 9999) > .load() > > val query = lines > .withColumn("time", current_timestamp) > .withWatermark("time", "1 second") > .groupBy(window($"time", "1 second")).agg(collect_list("value") as > "value") > .withColumn("windowstring", $"window" as "string") > .writeStream > .format("console") > .outputMode(OutputMode.Append) > .start() > > query.awaitTermination() > > > > before i start it i create a little server with nc: > > $ nc -lk 9999 > > > > after it starts i simply type in a single character every 20 seconds or so > inside nc and hit enter. my characters are 1, 2, 3, etc. > > > > the thing i dont understand is it comes back with the correct responses, > but with delays in terms of entries (not time). after the first 2 > characters it comes back with empty aggregations, and then for every next > character it comes back with the response for 2 characters ago. so when i > hit 3<enter> it comes back with the response for 1<enter>. > > > > not very realtime :( > > > > any idea why? > > > > i would like it to respond to my input 1<enter> with the relevant response > for that input (after the window and watermark has expired, of course, so > within 2 seconds). > > > > i tried adding a trigger of 1 second but that didnt help either. > > > > below is the output with my inputs inserted using '<= <input>', so '<= 1' > means i hit 1 and then enter. > > > > > > <= 1 > ------------------------------------------- > Batch: 0 > ------------------------------------------- > +------+-----+------------+ > |window|value|windowstring| > +------+-----+------------+ > +------+-----+------------+ > > <= 2 > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +------+-----+------------+ > |window|value|windowstring| > +------+-----+------------+ > +------+-----+------------+ > > <= 3 > Batch: 2 > ------------------------------------------- > +--------------------+-----+--------------------+ > | window|value| windowstring| > +--------------------+-----+--------------------+ > |[2018-05-28 18:00...| [1]|[2018-05-28 18:00...| > +--------------------+-----+--------------------+ > > <= 4 > ------------------------------------------- > Batch: 3 > ------------------------------------------- > +--------------------+-----+--------------------+ > | window|value| windowstring| > +--------------------+-----+--------------------+ > |[2018-05-28 18:00...| [2]|[2018-05-28 18:00...| > +--------------------+-----+--------------------+ > > <= 5 > ------------------------------------------- > Batch: 4 > ------------------------------------------- > +--------------------+-----+--------------------+ > | window|value| windowstring| > +--------------------+-----+--------------------+ > |[2018-05-28 18:01...| [3]|[2018-05-28 18:01...| > +--------------------+-----+--------------------+ > > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >