Can you show the explain() for the version that doesn't work? You might just be hitting a bug.
On Tue, Sep 12, 2017 at 9:03 PM, 张万新 <kevinzwx1...@gmail.com> wrote: > It seems current_timestamp() cannot be used directly in window function? > because after attempts I found that using > > *df.count.withColumn("pTime", current_timestamp).select(window($"pTime", > "15 minutes"), $"count")* > > instead of > > *df.count.withColumn("window", window(current_timestamp(), "15 minutes"))* > > throws no exception and works fine. I don't know if this is a problem that > needs improvement. > > > 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:43写道: > >> and I use .withColumn("window", window(current_timestamp(), "15 >> minutes")) after count >> >> 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道: >> >>> *Yes, my code is shown below* >>> /** >>> * input >>> */ >>> val logs = spark >>> .readStream >>> .format("kafka") >>> .option("kafka.bootstrap.servers", BROKER_SERVER) >>> .option("subscribe", TOPIC) >>> .option("startingOffset", "latest") >>> .load() >>> >>> /** >>> * process >>> */ >>> val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)] >>> >>> val events = logValues >>> .map(parseFunction) >>> .select( >>> $"_1".alias("date").cast("timestamp"), >>> $"_2".alias("uuid").cast("string") >>> ) >>> >>> val results = events >>> .withWatermark("date", "1 day") >>> .dropDuplicates("uuid", "date") >>> .groupBy($"date") >>> .count() >>> .withColumn("window", window(current_timestamp(), "15 minutes")) >>> >>> /** >>> * output >>> */ >>> val query = results >>> .writeStream >>> .outputMode("update") >>> .format("console") >>> .option("truncate", "false") >>> .trigger(Trigger.ProcessingTime("1 seconds")) >>> .start() >>> >>> query.awaitTermination() >>> >>> *and I use play json to parse input logs from kafka ,the parse function >>> is like* >>> >>> def parseFunction(str: String): (Long, String) = { >>> val json = Json.parse(str) >>> val timestamp = (json \ "time").get.toString().toLong >>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 >>> val uuid = (json \ "uuid").get.toString() >>> (date, uuid) >>> } >>> >>> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道: >>> >>>> Can you show all the code? This works for me. >>>> >>>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote: >>>> >>>>> The spark version is 2.2.0 >>>>> >>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道: >>>>> >>>>>> Which version of spark? >>>>>> >>>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote: >>>>>> >>>>>>> Thanks for reply, but using this method I got an exception: >>>>>>> >>>>>>> "Exception in thread "main" >>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: >>>>>>> nondeterministic expressions are only allowed in >>>>>>> >>>>>>> Project, Filter, Aggregate or Window" >>>>>>> >>>>>>> Can you give more advice? >>>>>>> >>>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道: >>>>>>> >>>>>>>> import org.apache.spark.sql.functions._ >>>>>>>> >>>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes")) >>>>>>>> >>>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> In structured streaming how can I add a column to a dataset with >>>>>>>>> current system time aligned with 15 minutes? >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>