Re: [SS]How to add a column with custom system time?

2017-09-14 Thread Michael Armbrust
Can you show the explain() for the version that doesn't work? You might just be hitting a bug. On Tue, Sep 12, 2017 at 9:03 PM, 张万新 wrote: > It seems current_timestamp() cannot be used directly in window function? > because after attempts I found that using > >

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
It seems current_timestamp() cannot be used directly in window function? because after attempts I found that using *df.count.withColumn("pTime", current_timestamp).select(window($"pTime", "15 minutes"), $"count")* instead of *df.count.withColumn("window", window(current_timestamp(), "15

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
and I use .withColumn("window", window(current_timestamp(), "15 minutes")) after count 张万新 于2017年9月13日周三 上午11:32写道: > *Yes, my code is shown below* > /** > * input > */ > val logs = spark > .readStream > .format("kafka") >

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
*Yes, my code is shown below* /** * input */ val logs = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", BROKER_SERVER) .option("subscribe", TOPIC) .option("startingOffset", "latest") .load() /** * process */ val logValues =

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread Michael Armbrust
Can you show all the code? This works for me. On Tue, Sep 12, 2017 at 12:05 AM, 张万新 wrote: > The spark version is 2.2.0 > > Michael Armbrust 于2017年9月12日周二 下午12:32写道: > >> Which version of spark? >> >> On Mon, Sep 11, 2017 at 8:27 PM, 张万新

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
The spark version is 2.2.0 Michael Armbrust 于2017年9月12日周二 下午12:32写道: > Which version of spark? > > On Mon, Sep 11, 2017 at 8:27 PM, 张万新 wrote: > >> Thanks for reply, but using this method I got an exception: >> >> "Exception in thread "main" >>

Re: [SS]How to add a column with custom system time?

2017-09-11 Thread Michael Armbrust
Which version of spark? On Mon, Sep 11, 2017 at 8:27 PM, 张万新 wrote: > Thanks for reply, but using this method I got an exception: > > "Exception in thread "main" > org.apache.spark.sql.streaming.StreamingQueryException: > nondeterministic expressions are only allowed in

Re: [SS]How to add a column with custom system time?

2017-09-11 Thread 张万新
Thanks for reply, but using this method I got an exception: "Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window" Can you give more advice? Michael Armbrust

Re: [SS]How to add a column with custom system time?

2017-09-11 Thread Michael Armbrust
import org.apache.spark.sql.functions._ df.withColumn("window", window(current_timestamp(), "15 minutes")) On Mon, Sep 11, 2017 at 3:03 AM, 张万新 wrote: > Hi, > > In structured streaming how can I add a column to a dataset with current > system time aligned with 15

[SS]How to add a column with custom system time?

2017-09-11 Thread 张万新
Hi, In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes? Thanks.