Can you show the explain() for the version that doesn't work?  You might
just be hitting a bug.

On Tue, Sep 12, 2017 at 9:03 PM, 张万新 <kevinzwx1...@gmail.com> wrote:

> It seems current_timestamp() cannot be used directly in window function?
> because after attempts I found that using
>
> *df.count.withColumn("pTime", current_timestamp).select(window($"pTime",
> "15 minutes"), $"count")*
>
> instead of
>
> *df.count.withColumn("window", window(current_timestamp(), "15 minutes"))*
>
> throws no exception and works fine. I don't know if this is a problem that
> needs improvement.
>
>
> 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:43写道:
>
>> and I use .withColumn("window", window(current_timestamp(), "15
>> minutes")) after count
>>
>> 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道:
>>
>>> *Yes, my code is shown below*
>>> /**
>>>     * input
>>>     */
>>>   val logs = spark
>>>     .readStream
>>>     .format("kafka")
>>>     .option("kafka.bootstrap.servers", BROKER_SERVER)
>>>     .option("subscribe", TOPIC)
>>>     .option("startingOffset", "latest")
>>>     .load()
>>>
>>>   /**
>>>     * process
>>>     */
>>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>>
>>>   val events = logValues
>>>     .map(parseFunction)
>>>     .select(
>>>       $"_1".alias("date").cast("timestamp"),
>>>       $"_2".alias("uuid").cast("string")
>>>     )
>>>
>>>   val results = events
>>>     .withWatermark("date", "1 day")
>>>     .dropDuplicates("uuid", "date")
>>>     .groupBy($"date")
>>>     .count()
>>>     .withColumn("window", window(current_timestamp(), "15 minutes"))
>>>
>>>   /**
>>>     * output
>>>     */
>>>   val query = results
>>>     .writeStream
>>>     .outputMode("update")
>>>     .format("console")
>>>     .option("truncate", "false")
>>>     .trigger(Trigger.ProcessingTime("1 seconds"))
>>>     .start()
>>>
>>>   query.awaitTermination()
>>>
>>> *and I use play json to parse input logs from kafka ,the parse function
>>> is like*
>>>
>>>   def parseFunction(str: String): (Long, String) = {
>>>     val json = Json.parse(str)
>>>     val timestamp = (json \ "time").get.toString().toLong
>>>     val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>>>     val uuid = (json \ "uuid").get.toString()
>>>     (date, uuid)
>>>   }
>>>
>>> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道:
>>>
>>>> Can you show all the code?  This works for me.
>>>>
>>>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>
>>>>> The spark version is 2.2.0
>>>>>
>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>>>>>
>>>>>> Which version of spark?
>>>>>>
>>>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for reply, but using this method I got an exception:
>>>>>>>
>>>>>>> "Exception in thread "main" 
>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException:
>>>>>>> nondeterministic expressions are only allowed in
>>>>>>>
>>>>>>> Project, Filter, Aggregate or Window"
>>>>>>>
>>>>>>> Can you give more advice?
>>>>>>>
>>>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>>>>>
>>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>>
>>>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>>>>
>>>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>>>>> current system time aligned with 15 minutes?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>

Reply via email to