It seems current_timestamp() cannot be used directly in window function? because after attempts I found that using
*df.count.withColumn("pTime", current_timestamp).select(window($"pTime", "15 minutes"), $"count")* instead of *df.count.withColumn("window", window(current_timestamp(), "15 minutes"))* throws no exception and works fine. I don't know if this is a problem that needs improvement. 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:43写道: > and I use .withColumn("window", window(current_timestamp(), "15 > minutes")) after count > > 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道: > >> *Yes, my code is shown below* >> /** >> * input >> */ >> val logs = spark >> .readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", BROKER_SERVER) >> .option("subscribe", TOPIC) >> .option("startingOffset", "latest") >> .load() >> >> /** >> * process >> */ >> val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)] >> >> val events = logValues >> .map(parseFunction) >> .select( >> $"_1".alias("date").cast("timestamp"), >> $"_2".alias("uuid").cast("string") >> ) >> >> val results = events >> .withWatermark("date", "1 day") >> .dropDuplicates("uuid", "date") >> .groupBy($"date") >> .count() >> .withColumn("window", window(current_timestamp(), "15 minutes")) >> >> /** >> * output >> */ >> val query = results >> .writeStream >> .outputMode("update") >> .format("console") >> .option("truncate", "false") >> .trigger(Trigger.ProcessingTime("1 seconds")) >> .start() >> >> query.awaitTermination() >> >> *and I use play json to parse input logs from kafka ,the parse function >> is like* >> >> def parseFunction(str: String): (Long, String) = { >> val json = Json.parse(str) >> val timestamp = (json \ "time").get.toString().toLong >> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 >> val uuid = (json \ "uuid").get.toString() >> (date, uuid) >> } >> >> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道: >> >>> Can you show all the code? This works for me. >>> >>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote: >>> >>>> The spark version is 2.2.0 >>>> >>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道: >>>> >>>>> Which version of spark? >>>>> >>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote: >>>>> >>>>>> Thanks for reply, but using this method I got an exception: >>>>>> >>>>>> "Exception in thread "main" >>>>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic >>>>>> expressions are only allowed in >>>>>> >>>>>> Project, Filter, Aggregate or Window" >>>>>> >>>>>> Can you give more advice? >>>>>> >>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道: >>>>>> >>>>>>> import org.apache.spark.sql.functions._ >>>>>>> >>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes")) >>>>>>> >>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> In structured streaming how can I add a column to a dataset with >>>>>>>> current system time aligned with 15 minutes? >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>> >>>>>>> >>>>> >>>