Re: how to use cluster sparkSession like localSession

2018-11-02 Thread
I think you should investigate apache zeppelin and livy 崔苗(数据与人工智能产品开发部) <0049003...@znv.com>于2018年11月2日 周五11:01写道: > > Hi, > we want to execute spark code with out submit application.jar,like this > code: > > public static void main(String args[]) throws Exception{ > SparkSession spark =

Why SparkSQL changes the table owner when performing alter table opertations?

2018-03-12 Thread
Hi, When using spark.sql() to perform alter table operations I found that spark changes the table owner property to the execution user. Then I digged into the source code and found that in HiveClientImpl, the alterTable function will set the owner of table to the current execution user. Besides,

spark.sql.adaptive.enabled has no effect

2018-01-30 Thread
Hi there, As far as I know, when *spark.sql.adaptive.enabled* is set to true, the number of post shuffle partitions should change with the map output size. But in my application there is a stage reading 900GB shuffled files only with 200 partitions (which is the default number of

Re: JDBC to hive batch use case in spark

2017-12-09 Thread
If you don't mind, I think it will help if you post your code Hokam Singh Chauhan 于2017年12月9日周六 下午8:02写道: > Hi, > I have an use case in which I wants to read data from a jdbc > source(Oracle) table and write it to hive table on periodic basis. I tried > this using the SQL

How to kill a query job when using spark thrift-server?

2017-11-27 Thread
Hi, I intend to use spark thrift-server as a service to support concurrent sql queries. But in our situation we need a way to kill arbitrary query job, is there an api to use here?

[Structured Streaming] How to compute the difference between two rows of a streaming dataframe?

2017-09-29 Thread
Hi, I want to compute the difference between two rows in a streaming dataframe, is there a feasible API? May be some function like the window function *lag *in normal dataframe, but it seems that this function is unavailable in streaming dataframe. Thanks.

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread
Sep 12, 2017 at 8:42 PM, 张万新 <kevinzwx1...@gmail.com> wrote: > >> *Yes, my code is shown below(I also post my code in another mail)* >> /** >> * input >> */ >> val logs = spark >> .readStream >> .format("kafka") >

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
dow", window(current_timestamp(), "15 minutes"))* throws no exception and works fine. I don't know if this is a problem that needs improvement. 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:43写道: > and I use .withColumn("window", window(current_timestamp(), &qu

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
and I use .withColumn("window", window(current_timestamp(), "15 minutes")) after count 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道: > *Yes, my code is shown below* > /** > * input > */ > val logs = spark > .rea

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread
) } and the java heap space is like (I've increase the executor memory to 15g): [image: image.png] Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:23写道: > Can you show the full query you are running? > > On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1...@gmail.c

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
json to parse input logs from kafka ,the parse function is like* def parseFunction(str: String): (Long, String) = { val json = Json.parse(str) val timestamp = (json \ "time").get.toString().toLong val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 v

[SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread
Hi, I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
The spark version is 2.2.0 Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道: > Which version of spark? > > On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote: > >> Thanks for reply, but using this method I got an exception: &g

Re: [SS]How to add a column with custom system time?

2017-09-11 Thread
chael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道: > import org.apache.spark.sql.functions._ > > df.withColumn("window", window(current_timestamp(), "15 minutes")) > > On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote: > &g

[SS]How to add a column with custom system time?

2017-09-11 Thread
Hi, In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes? Thanks.

Will an input event older than watermark be dropped?

2017-09-06 Thread
Hi, I'm investigating the watermark for some time, according to the guide, if we specify a watermark on event time column, the watermark will be used to drop old state data. Then, take window-based count for example, if an event whose time is older than the watermark comes, it will be simply

Re: Different watermark for different kafka partitions in Structured Streaming

2017-09-04 Thread
a better way. > > On Fri, Sep 1, 2017 at 4:59 PM, 张万新 <kevinzwx1...@gmail.com> wrote: > >> Thanks, it's true that looser watermark can guarantee more data not be >> dropped, but at the same time more state need to be kept. I just consider >> if there is s

Re: Different watermark for different kafka partitions in Structured Streaming

2017-09-01 Thread
Thanks, it's true that looser watermark can guarantee more data not be dropped, but at the same time more state need to be kept. I just consider if there is sth like kafka-partition-aware watermark in flink in SS may be a better solution. Tathagata Das 于2017年8月31日周四

Re: [Structured Streaming]Data processing and output trigger should be decoupled

2017-08-31 Thread
I think something like state store can be used to keep the intermediate data. For aggregations the engines keeps processing batches of data and update the results in state store(or sth like this), and when a trigger begins the engines just fetch the current result from state store and output it to

Re: Why do checkpoints work the way they do?

2017-08-31 Thread
So is there any documents demonstrating in what condition can my application recover from the same checkpoint and in what condition not? Tathagata Das 于2017年8月30日周三 下午1:20写道: > Hello, > > This is an unfortunate design on my part when I was building DStreams :) > >

Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread
Hi, I'm working with Structured Streaming to process logs from kafka and use watermark to handle late events. Currently the watermark is computed by (max event time seen by the engine - late threshold), and the same watermark is used for all partitions. But in production environment it happens

Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread
Hi, I'm working with Structured Streaming to process logs from kafka and use watermark to handle late events. Currently the watermark is computed by (max event time seen by the engine - late threshold), and the same watermark is used for all partitions. But in production environment it happens