It seems current_timestamp() cannot be used directly in window function?
because after attempts I found that using
*df.count.withColumn("pTime", current_timestamp).select(window($"pTime",
"15 minutes"), $"count")*
instead of
*df.count.withColumn("window", window(current_timestamp(), "15 minutes"))*
throws no exception and works fine. I don't know if this is a problem that
needs improvement.
张万新 <[email protected]>于2017年9月13日周三 上午11:43写道:
> and I use .withColumn("window", window(current_timestamp(), "15
> minutes")) after count
>
> 张万新 <[email protected]>于2017年9月13日周三 上午11:32写道:
>
>> *Yes, my code is shown below*
>> /**
>> * input
>> */
>> val logs = spark
>> .readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", BROKER_SERVER)
>> .option("subscribe", TOPIC)
>> .option("startingOffset", "latest")
>> .load()
>>
>> /**
>> * process
>> */
>> val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>
>> val events = logValues
>> .map(parseFunction)
>> .select(
>> $"_1".alias("date").cast("timestamp"),
>> $"_2".alias("uuid").cast("string")
>> )
>>
>> val results = events
>> .withWatermark("date", "1 day")
>> .dropDuplicates("uuid", "date")
>> .groupBy($"date")
>> .count()
>> .withColumn("window", window(current_timestamp(), "15 minutes"))
>>
>> /**
>> * output
>> */
>> val query = results
>> .writeStream
>> .outputMode("update")
>> .format("console")
>> .option("truncate", "false")
>> .trigger(Trigger.ProcessingTime("1 seconds"))
>> .start()
>>
>> query.awaitTermination()
>>
>> *and I use play json to parse input logs from kafka ,the parse function
>> is like*
>>
>> def parseFunction(str: String): (Long, String) = {
>> val json = Json.parse(str)
>> val timestamp = (json \ "time").get.toString().toLong
>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>> val uuid = (json \ "uuid").get.toString()
>> (date, uuid)
>> }
>>
>> Michael Armbrust <[email protected]>于2017年9月13日周三 上午2:36写道:
>>
>>> Can you show all the code? This works for me.
>>>
>>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <[email protected]> wrote:
>>>
>>>> The spark version is 2.2.0
>>>>
>>>> Michael Armbrust <[email protected]>于2017年9月12日周二 下午12:32写道:
>>>>
>>>>> Which version of spark?
>>>>>
>>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <[email protected]> wrote:
>>>>>
>>>>>> Thanks for reply, but using this method I got an exception:
>>>>>>
>>>>>> "Exception in thread "main"
>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>>>>>> expressions are only allowed in
>>>>>>
>>>>>> Project, Filter, Aggregate or Window"
>>>>>>
>>>>>> Can you give more advice?
>>>>>>
>>>>>> Michael Armbrust <[email protected]>于2017年9月12日周二 上午4:48写道:
>>>>>>
>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>
>>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>>>
>>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>>>> current system time aligned with 15 minutes?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>