It seems current_timestamp() cannot be used directly in window function?
because after attempts I found that using

*df.count.withColumn("pTime", current_timestamp).select(window($"pTime",
"15 minutes"), $"count")*

instead of

*df.count.withColumn("window", window(current_timestamp(), "15 minutes"))*

throws no exception and works fine. I don't know if this is a problem that
needs improvement.


张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:43写道:

> and I use .withColumn("window", window(current_timestamp(), "15
> minutes")) after count
>
> 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道:
>
>> *Yes, my code is shown below*
>> /**
>>     * input
>>     */
>>   val logs = spark
>>     .readStream
>>     .format("kafka")
>>     .option("kafka.bootstrap.servers", BROKER_SERVER)
>>     .option("subscribe", TOPIC)
>>     .option("startingOffset", "latest")
>>     .load()
>>
>>   /**
>>     * process
>>     */
>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>
>>   val events = logValues
>>     .map(parseFunction)
>>     .select(
>>       $"_1".alias("date").cast("timestamp"),
>>       $"_2".alias("uuid").cast("string")
>>     )
>>
>>   val results = events
>>     .withWatermark("date", "1 day")
>>     .dropDuplicates("uuid", "date")
>>     .groupBy($"date")
>>     .count()
>>     .withColumn("window", window(current_timestamp(), "15 minutes"))
>>
>>   /**
>>     * output
>>     */
>>   val query = results
>>     .writeStream
>>     .outputMode("update")
>>     .format("console")
>>     .option("truncate", "false")
>>     .trigger(Trigger.ProcessingTime("1 seconds"))
>>     .start()
>>
>>   query.awaitTermination()
>>
>> *and I use play json to parse input logs from kafka ,the parse function
>> is like*
>>
>>   def parseFunction(str: String): (Long, String) = {
>>     val json = Json.parse(str)
>>     val timestamp = (json \ "time").get.toString().toLong
>>     val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>>     val uuid = (json \ "uuid").get.toString()
>>     (date, uuid)
>>   }
>>
>> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道:
>>
>>> Can you show all the code?  This works for me.
>>>
>>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>
>>>> The spark version is 2.2.0
>>>>
>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>>>>
>>>>> Which version of spark?
>>>>>
>>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for reply, but using this method I got an exception:
>>>>>>
>>>>>> "Exception in thread "main"
>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>>>>>> expressions are only allowed in
>>>>>>
>>>>>> Project, Filter, Aggregate or Window"
>>>>>>
>>>>>> Can you give more advice?
>>>>>>
>>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>>>>
>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>
>>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>>>
>>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>>>> current system time aligned with 15 minutes?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>

Reply via email to