and I use .withColumn("window", window(current_timestamp(), "15 minutes")) after count
张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道: > *Yes, my code is shown below* > /** > * input > */ > val logs = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", BROKER_SERVER) > .option("subscribe", TOPIC) > .option("startingOffset", "latest") > .load() > > /** > * process > */ > val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)] > > val events = logValues > .map(parseFunction) > .select( > $"_1".alias("date").cast("timestamp"), > $"_2".alias("uuid").cast("string") > ) > > val results = events > .withWatermark("date", "1 day") > .dropDuplicates("uuid", "date") > .groupBy($"date") > .count() > .withColumn("window", window(current_timestamp(), "15 minutes")) > > /** > * output > */ > val query = results > .writeStream > .outputMode("update") > .format("console") > .option("truncate", "false") > .trigger(Trigger.ProcessingTime("1 seconds")) > .start() > > query.awaitTermination() > > *and I use play json to parse input logs from kafka ,the parse function is > like* > > def parseFunction(str: String): (Long, String) = { > val json = Json.parse(str) > val timestamp = (json \ "time").get.toString().toLong > val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 > val uuid = (json \ "uuid").get.toString() > (date, uuid) > } > > Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道: > >> Can you show all the code? This works for me. >> >> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote: >> >>> The spark version is 2.2.0 >>> >>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道: >>> >>>> Which version of spark? >>>> >>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote: >>>> >>>>> Thanks for reply, but using this method I got an exception: >>>>> >>>>> "Exception in thread "main" >>>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic >>>>> expressions are only allowed in >>>>> >>>>> Project, Filter, Aggregate or Window" >>>>> >>>>> Can you give more advice? >>>>> >>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道: >>>>> >>>>>> import org.apache.spark.sql.functions._ >>>>>> >>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes")) >>>>>> >>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> In structured streaming how can I add a column to a dataset with >>>>>>> current system time aligned with 15 minutes? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>> >>>>>> >>>> >>