*Yes, my code is shown below*
/**
* input
*/
val logs = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", BROKER_SERVER)
.option("subscribe", TOPIC)
.option("startingOffset", "latest")
.load()
/**
* process
*/
val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
val events = logValues
.map(parseFunction)
.select(
$"_1".alias("date").cast("timestamp"),
$"_2".alias("uuid").cast("string")
)
val results = events
.withWatermark("date", "1 day")
.dropDuplicates("uuid", "date")
.groupBy($"date")
.count()
.withColumn("window", window(current_timestamp(), "15 minutes"))
/**
* output
*/
val query = results
.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()
query.awaitTermination()
*and I use play json to parse input logs from kafka ,the parse function is
like*
def parseFunction(str: String): (Long, String) = {
val json = Json.parse(str)
val timestamp = (json \ "time").get.toString().toLong
val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
val uuid = (json \ "uuid").get.toString()
(date, uuid)
}
Michael Armbrust <[email protected]>于2017年9月13日周三 上午2:36写道:
> Can you show all the code? This works for me.
>
> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <[email protected]> wrote:
>
>> The spark version is 2.2.0
>>
>> Michael Armbrust <[email protected]>于2017年9月12日周二 下午12:32写道:
>>
>>> Which version of spark?
>>>
>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <[email protected]> wrote:
>>>
>>>> Thanks for reply, but using this method I got an exception:
>>>>
>>>> "Exception in thread "main"
>>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>>>> expressions are only allowed in
>>>>
>>>> Project, Filter, Aggregate or Window"
>>>>
>>>> Can you give more advice?
>>>>
>>>> Michael Armbrust <[email protected]>于2017年9月12日周二 上午4:48写道:
>>>>
>>>>> import org.apache.spark.sql.functions._
>>>>>
>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>
>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>> current system time aligned with 15 minutes?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>
>>>>>
>>>
>