Re: how to use cluster sparkSession like localSession

2018-11-02 Thread
I think you should investigate apache zeppelin and livy
崔苗(数据与人工智能产品开发部) <0049003...@znv.com>于2018年11月2日 周五11:01写道:

>
> Hi,
> we want to execute spark code with out submit application.jar,like this
> code:
>
> public static void main(String args[]) throws Exception{
> SparkSession spark = SparkSession
> .builder()
> .master("local[*]")
> .appName("spark test")
> .getOrCreate();
>
> Dataset testData =
> spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv");
> testData.printSchema();
> testData.show();
> }
>
> the above code can work well with idea , do not need to generate jar file
> and submit , but if we replace master("local[*]") with master("yarn") ,
> it can't work , so is there a way to use cluster sparkSession like local
> sparkSession ?  we need to dynamically execute spark code in web server
> according to the different request ,  such as filter request will call
> dataset.filter() , so there is no application.jar to submit .
>
> 0049003208
> 0049003...@znv.com
>
> 
> 签名由 网易邮箱大师  定制
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Why SparkSQL changes the table owner when performing alter table opertations?

2018-03-12 Thread
Hi,

When using spark.sql() to perform alter table operations I found that spark
changes the table owner property to the execution user. Then I digged into
the source code and found that in HiveClientImpl, the alterTable function
will set the owner of table to the current execution user. Besides, some
other operations like alter partition, getPartitionOption and so on do the
same. Can some experts explain why should we do this? What if just behave
like hive which does not change the owner when doing these kind of
operations?


spark.sql.adaptive.enabled has no effect

2018-01-30 Thread
Hi there,

  As far as I know, when *spark.sql.adaptive.enabled* is set to true, the
number of post shuffle partitions should change with the map output size.
But in my application there is a stage reading 900GB shuffled files only
with 200 partitions (which is the default number of
*spark.sql.shuffle.partitions*), and I verified that the number of  post
shuffle partitions if always the same as the value of
spark.sql.shuffle.partitions.  Additionally I leave the value of
*spark.sql.adaptive**.shuffle.targetPostShuffleInputSize* by default. Is
there any mistake I've made and what's the correct behavior?

Thanks


Re: JDBC to hive batch use case in spark

2017-12-09 Thread
If you don't mind, I think it will help if you post your code

Hokam Singh Chauhan 于2017年12月9日周六 下午8:02写道:

> Hi,
> I have an use case in which I wants to read data from a jdbc
> source(Oracle) table and write it to hive table on periodic basis. I tried
> this using the SQL context to read from Oracle and Hive context to write
> the data in hive. The data read parts works fine but when I ran the save
> call on hive context to write data, it throws the exception and it says the
> table or view does not exists even though the table is precreated in hive.
>
> Please help if anyone tried such scenario.
>
> Thanks
>


How to kill a query job when using spark thrift-server?

2017-11-27 Thread
Hi,

I intend to use spark thrift-server as a service to support concurrent sql
queries. But in our situation we need a way to kill arbitrary query job, is
there an api to use here?


[Structured Streaming] How to compute the difference between two rows of a streaming dataframe?

2017-09-29 Thread
Hi,

I want to compute the difference between two rows in a streaming dataframe,
is there a feasible API? May be some function like the window function *lag
*in normal dataframe, but it seems that this function is unavailable in
streaming dataframe.

Thanks.


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread
There is expected to be about 5 million UUIDs in a day. I need to use this
field to drop duplicate records and count number. If I simply count numbers
without using dropDuplicates it only occupies less than 1g memory. I
believe most of the memory is occupied by the state store for keeping the
state of dropDuplicates. But I cannot find a way to alleviate the problem.

Michael Armbrust <mich...@databricks.com>于2017年9月15日周五 上午3:35写道:

> How many UUIDs do you expect to have in a day?  That is likely where all
> the memory is being used.  Does it work without that?
>
> On Tue, Sep 12, 2017 at 8:42 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>
>> *Yes, my code is shown below(I also post my code in another mail)*
>> /**
>> * input
>> */
>>   val logs = spark
>> .readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", BROKER_SERVER)
>> .option("subscribe", TOPIC)
>> .option("startingOffset", "latest")
>> .load()
>>
>>   /**
>> * process
>> */
>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>
>>   val events = logValues
>> .map(parseFunction)
>> .select(
>>   $"_1".alias("date").cast("timestamp"),
>>   $"_2".alias("uuid").cast("string")
>> )
>>
>>   val results = events
>> .withWatermark("date", "1 day")
>> .dropDuplicates("uuid", "date")
>> .groupBy($"date")
>> .count()
>>
>>   /**
>> * output
>> */
>>   val query = results
>> .writeStream
>> .outputMode("update")
>> .format("console")
>> .option("truncate", "false")
>> .trigger(Trigger.ProcessingTime("1 seconds"))
>> .start()
>>
>>   query.awaitTermination()
>>
>> *and I use play json to parse input logs from kafka ,the parse function
>> is like*
>>
>>   def parseFunction(str: String): (Long, String) = {
>> val json = Json.parse(str)
>> val timestamp = (json \ "time").get.toString().toLong
>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>> val uuid = (json \ "uuid").get.toString()
>> (date, uuid)
>>   }
>>
>> and the java heap space is like (I've increase the executor memory to
>> 15g):
>>
>> [image: image.png]
>> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:23写道:
>>
>>> Can you show the full query you are running?
>>>
>>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm using structured streaming to count unique visits of our website. I
>>>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>>>> memory to 4 cores * 10g memory for each executor, but there are frequent
>>>> full gc, and once the count raises to about more than 4.5 millions the
>>>> application will be blocked and finally crash in OOM. It's kind of
>>>> unreasonable. So is there any suggestion to optimize the memory consumption
>>>> of SS? Thanks.
>>>>
>>>
>>>
>


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
It seems current_timestamp() cannot be used directly in window function?
because after attempts I found that using

*df.count.withColumn("pTime", current_timestamp).select(window($"pTime",
"15 minutes"), $"count")*

instead of

*df.count.withColumn("window", window(current_timestamp(), "15 minutes"))*

throws no exception and works fine. I don't know if this is a problem that
needs improvement.


张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:43写道:

> and I use .withColumn("window", window(current_timestamp(), "15
> minutes")) after count
>
> 张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道:
>
>> *Yes, my code is shown below*
>> /**
>> * input
>> */
>>   val logs = spark
>> .readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", BROKER_SERVER)
>> .option("subscribe", TOPIC)
>> .option("startingOffset", "latest")
>> .load()
>>
>>   /**
>> * process
>> */
>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>
>>   val events = logValues
>> .map(parseFunction)
>> .select(
>>   $"_1".alias("date").cast("timestamp"),
>>   $"_2".alias("uuid").cast("string")
>> )
>>
>>   val results = events
>> .withWatermark("date", "1 day")
>> .dropDuplicates("uuid", "date")
>> .groupBy($"date")
>> .count()
>> .withColumn("window", window(current_timestamp(), "15 minutes"))
>>
>>   /**
>> * output
>> */
>>   val query = results
>> .writeStream
>> .outputMode("update")
>> .format("console")
>> .option("truncate", "false")
>> .trigger(Trigger.ProcessingTime("1 seconds"))
>> .start()
>>
>>   query.awaitTermination()
>>
>> *and I use play json to parse input logs from kafka ,the parse function
>> is like*
>>
>>   def parseFunction(str: String): (Long, String) = {
>> val json = Json.parse(str)
>> val timestamp = (json \ "time").get.toString().toLong
>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>> val uuid = (json \ "uuid").get.toString()
>> (date, uuid)
>>   }
>>
>> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道:
>>
>>> Can you show all the code?  This works for me.
>>>
>>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>
>>>> The spark version is 2.2.0
>>>>
>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>>>>
>>>>> Which version of spark?
>>>>>
>>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for reply, but using this method I got an exception:
>>>>>>
>>>>>> "Exception in thread "main"
>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>>>>>> expressions are only allowed in
>>>>>>
>>>>>> Project, Filter, Aggregate or Window"
>>>>>>
>>>>>> Can you give more advice?
>>>>>>
>>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>>>>
>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>
>>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>>>
>>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>>>> current system time aligned with 15 minutes?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
and I use .withColumn("window", window(current_timestamp(), "15
minutes")) after
count

张万新 <kevinzwx1...@gmail.com>于2017年9月13日周三 上午11:32写道:

> *Yes, my code is shown below*
> /**
> * input
> */
>   val logs = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", BROKER_SERVER)
> .option("subscribe", TOPIC)
> .option("startingOffset", "latest")
> .load()
>
>   /**
> * process
> */
>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>
>   val events = logValues
> .map(parseFunction)
> .select(
>   $"_1".alias("date").cast("timestamp"),
>   $"_2".alias("uuid").cast("string")
> )
>
>   val results = events
> .withWatermark("date", "1 day")
> .dropDuplicates("uuid", "date")
> .groupBy($"date")
> .count()
> .withColumn("window", window(current_timestamp(), "15 minutes"))
>
>   /**
> * output
> */
>   val query = results
> .writeStream
> .outputMode("update")
> .format("console")
> .option("truncate", "false")
> .trigger(Trigger.ProcessingTime("1 seconds"))
> .start()
>
>   query.awaitTermination()
>
> *and I use play json to parse input logs from kafka ,the parse function is
> like*
>
>   def parseFunction(str: String): (Long, String) = {
> val json = Json.parse(str)
> val timestamp = (json \ "time").get.toString().toLong
> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
> val uuid = (json \ "uuid").get.toString()
> (date, uuid)
>   }
>
> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道:
>
>> Can you show all the code?  This works for me.
>>
>> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>
>>> The spark version is 2.2.0
>>>
>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>>>
>>>> Which version of spark?
>>>>
>>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>
>>>>> Thanks for reply, but using this method I got an exception:
>>>>>
>>>>> "Exception in thread "main"
>>>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>>>>> expressions are only allowed in
>>>>>
>>>>> Project, Filter, Aggregate or Window"
>>>>>
>>>>> Can you give more advice?
>>>>>
>>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>>>
>>>>>> import org.apache.spark.sql.functions._
>>>>>>
>>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>>
>>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>>> current system time aligned with 15 minutes?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread
*Yes, my code is shown below(I also post my code in another mail)*
/**
* input
*/
  val logs = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", BROKER_SERVER)
.option("subscribe", TOPIC)
.option("startingOffset", "latest")
.load()

  /**
* process
*/
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
.map(parseFunction)
.select(
  $"_1".alias("date").cast("timestamp"),
  $"_2".alias("uuid").cast("string")
)

  val results = events
.withWatermark("date", "1 day")
.dropDuplicates("uuid", "date")
.groupBy($"date")
.count()

  /**
* output
*/
  val query = results
.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()

  query.awaitTermination()

*and I use play json to parse input logs from kafka ,the parse function is
like*

  def parseFunction(str: String): (Long, String) = {
val json = Json.parse(str)
val timestamp = (json \ "time").get.toString().toLong
val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
val uuid = (json \ "uuid").get.toString()
(date, uuid)
  }

and the java heap space is like (I've increase the executor memory to 15g):

[image: image.png]
Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:23写道:

> Can you show the full query you are running?
>
> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using structured streaming to count unique visits of our website. I
>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>> memory to 4 cores * 10g memory for each executor, but there are frequent
>> full gc, and once the count raises to about more than 4.5 millions the
>> application will be blocked and finally crash in OOM. It's kind of
>> unreasonable. So is there any suggestion to optimize the memory consumption
>> of SS? Thanks.
>>
>
>


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
*Yes, my code is shown below*
/**
* input
*/
  val logs = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", BROKER_SERVER)
.option("subscribe", TOPIC)
.option("startingOffset", "latest")
.load()

  /**
* process
*/
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
.map(parseFunction)
.select(
  $"_1".alias("date").cast("timestamp"),
  $"_2".alias("uuid").cast("string")
)

  val results = events
.withWatermark("date", "1 day")
.dropDuplicates("uuid", "date")
.groupBy($"date")
.count()
.withColumn("window", window(current_timestamp(), "15 minutes"))

  /**
* output
*/
  val query = results
.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()

  query.awaitTermination()

*and I use play json to parse input logs from kafka ,the parse function is
like*

  def parseFunction(str: String): (Long, String) = {
val json = Json.parse(str)
val timestamp = (json \ "time").get.toString().toLong
val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
val uuid = (json \ "uuid").get.toString()
(date, uuid)
  }

Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:36写道:

> Can you show all the code?  This works for me.
>
> On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>
>> The spark version is 2.2.0
>>
>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:
>>
>>> Which version of spark?
>>>
>>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>
>>>> Thanks for reply, but using this method I got an exception:
>>>>
>>>> "Exception in thread "main"
>>>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>>>> expressions are only allowed in
>>>>
>>>> Project, Filter, Aggregate or Window"
>>>>
>>>> Can you give more advice?
>>>>
>>>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>>>
>>>>> import org.apache.spark.sql.functions._
>>>>>
>>>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>>>
>>>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> In structured streaming how can I add a column to a dataset with
>>>>>> current system time aligned with 15 minutes?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>
>>>>>
>>>
>


[SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread
Hi,

I'm using structured streaming to count unique visits of our website. I use
spark on yarn mode with 4 executor instances and from 2 cores * 5g memory
to 4 cores * 10g memory for each executor, but there are frequent full gc,
and once the count raises to about more than 4.5 millions the application
will be blocked and finally crash in OOM. It's kind of unreasonable. So is
there any suggestion to optimize the memory consumption of SS? Thanks.


Re: [SS]How to add a column with custom system time?

2017-09-12 Thread
The spark version is 2.2.0

Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 下午12:32写道:

> Which version of spark?
>
> On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>
>> Thanks for reply, but using this method I got an exception:
>>
>> "Exception in thread "main"
>> org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
>> expressions are only allowed in
>>
>> Project, Filter, Aggregate or Window"
>>
>> Can you give more advice?
>>
>> Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:
>>
>>> import org.apache.spark.sql.functions._
>>>
>>> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>>>
>>> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> In structured streaming how can I add a column to a dataset with
>>>> current system time aligned with 15 minutes?
>>>>
>>>> Thanks.
>>>>
>>>
>>>
>


Re: [SS]How to add a column with custom system time?

2017-09-11 Thread
Thanks for reply, but using this method I got an exception:

"Exception in thread "main"
org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic
expressions are only allowed in

Project, Filter, Aggregate or Window"

Can you give more advice?

Michael Armbrust <mich...@databricks.com>于2017年9月12日周二 上午4:48写道:

> import org.apache.spark.sql.functions._
>
> df.withColumn("window", window(current_timestamp(), "15 minutes"))
>
> On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>
>> Hi,
>>
>> In structured streaming how can I add a column to a dataset with current
>> system time aligned with 15 minutes?
>>
>> Thanks.
>>
>
>


[SS]How to add a column with custom system time?

2017-09-11 Thread
Hi,

In structured streaming how can I add a column to a dataset with current
system time aligned with 15 minutes?

Thanks.


Will an input event older than watermark be dropped?

2017-09-06 Thread
Hi,

I'm investigating the watermark for some time, according to the guide, if
we specify a watermark on event time column, the watermark will be used to
drop old state data. Then, take window-based count for example, if an event
whose time is older than the watermark comes, it will be simply dropped, or
a new count result be calculated for corresponding window?


Re: Different watermark for different kafka partitions in Structured Streaming

2017-09-04 Thread
@Rayn It's frequently observed in our production environment that different
partition's consumption rate vary for kinds of reasons, including
performance difference of machines holding the partitions, unevenly
distribution of messages and so on. So I hope there can be some advice on
how to design a per-kafka-partition watermark in SS.

Ryan <ryan.hd@gmail.com>于2017年9月2日周六 上午10:36写道:

> I don't think ss now support "partitioned" watermark. and why different
> partition's consumption rate vary? If the handling logic is quite
> different, using different topic is a better way.
>
> On Fri, Sep 1, 2017 at 4:59 PM, 张万新 <kevinzwx1...@gmail.com> wrote:
>
>> Thanks, it's true that looser watermark can guarantee more data not be
>> dropped, but at the same time more state need to be kept.   I just consider
>> if there is sth like kafka-partition-aware watermark in flink in SS may be
>> a better solution.
>>
>> Tathagata Das <tathagata.das1...@gmail.com>于2017年8月31日周四 上午9:13写道:
>>
>>> Why not set the watermark to be looser, one that works across all
>>> partitions? The main usage of watermark is to drop state. If you loosen the
>>> watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more
>>> state with older data, but you are guaranteed that you will not drop
>>> important data.
>>>
>>> On Wed, Aug 30, 2017 at 7:41 AM, KevinZwx <kevinzwx1...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm working with Structured Streaming to process logs from kafka and use
>>>> watermark to handle late events. Currently the watermark is computed by
>>>> (max
>>>> event time seen by the engine - late threshold), and the same watermark
>>>> is
>>>> used for all partitions.
>>>>
>>>> But in production environment it happens frequently that different
>>>> partition
>>>> is consumed at different speed, the consumption of some partitions may
>>>> be
>>>> left behind, so the newest event time in these partitions may be much
>>>> smaller than than the others'. In this case using the same watermark
>>>> for all
>>>> partitions may cause heavy data loss.
>>>>
>>>> So is there any way to achieve different watermark for different kafka
>>>> partition or any plan to work on this?
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>


Re: Different watermark for different kafka partitions in Structured Streaming

2017-09-01 Thread
Thanks, it's true that looser watermark can guarantee more data not be
dropped, but at the same time more state need to be kept.   I just consider
if there is sth like kafka-partition-aware watermark in flink in SS may be
a better solution.

Tathagata Das 于2017年8月31日周四 上午9:13写道:

> Why not set the watermark to be looser, one that works across all
> partitions? The main usage of watermark is to drop state. If you loosen the
> watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more
> state with older data, but you are guaranteed that you will not drop
> important data.
>
> On Wed, Aug 30, 2017 at 7:41 AM, KevinZwx  wrote:
>
>> Hi,
>>
>> I'm working with Structured Streaming to process logs from kafka and use
>> watermark to handle late events. Currently the watermark is computed by
>> (max
>> event time seen by the engine - late threshold), and the same watermark is
>> used for all partitions.
>>
>> But in production environment it happens frequently that different
>> partition
>> is consumed at different speed, the consumption of some partitions may be
>> left behind, so the newest event time in these partitions may be much
>> smaller than than the others'. In this case using the same watermark for
>> all
>> partitions may cause heavy data loss.
>>
>> So is there any way to achieve different watermark for different kafka
>> partition or any plan to work on this?
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: [Structured Streaming]Data processing and output trigger should be decoupled

2017-08-31 Thread
I think something like state store can be used to keep the intermediate
data. For aggregations the engines keeps processing batches of data and
update the results in state store(or sth like this), and when a trigger
begins the engines just fetch the current result from state store and
output it to the sink specified by users.

Or at least another way, if the processing time is shorter than the trigger
interval, can there be a way to for the engine to first complete most of
jobs or stages, and when the trigger starts, the final job or stages are
done to get the final result and output it to the sink?

Shixiong(Ryan) Zhu 于2017年8月31日周四 上午1:59写道:

> I don't think that's a good idea. If the engine keeps on processing data
> but doesn't output anything, where to keep the intermediate data?
>
> On Wed, Aug 30, 2017 at 9:26 AM, KevinZwx  wrote:
>
>> Hi,
>>
>> I'm working with structured streaming, and I'm wondering whether there
>> should be some improvements about trigger.
>>
>> Currently, when I specify a trigger, i.e.
>> tigger(Trigger.ProcessingTime("10
>> minutes")), the engine will begin processing data at the time the trigger
>> begins, like 10:00:00, 10:10:00, 10:20:00,..., etc, if the engine takes
>> 10s
>> to process this batch of data, then we will get the output result at
>> 10:00:10...,  then the engine just waits without processing any data. When
>> the next trigger begins, the engine begins to process the data during the
>> interval, and if this time the engine takes 15s to process the batch, we
>> will get result at 10:10:15. This is the problem.
>>
>> In my understanding, the trigger and data processing should be decoupled,
>> the engine should keep on processing data as fast as possible, but only
>> generate output results at each trigger, therefore we can get the result
>> at
>> 10:00:00, 10:10:00, 10:20:00, ... So I'm wondering if there is any
>> solution
>> or plan to work on this?
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Why do checkpoints work the way they do?

2017-08-31 Thread
So is there any documents demonstrating in what condition can my
application recover from the same checkpoint and in what condition not?

Tathagata Das 于2017年8月30日周三 下午1:20写道:

> Hello,
>
> This is an unfortunate design on my part when I was building DStreams :)
>
> Fortunately, we learnt from our mistakes and built Structured Streaming
> the correct way. Checkpointing in Structured Streaming stores only the
> progress information (offsets, etc.), and the user can change their
> application code (within certain constraints, of course) and still restart
> from checkpoints (unlike DStreams). If you are just building out your
> streaming applications, then I highly recommend you to try out Structured
> Streaming instead of DStreams (which is effectively in maintenance mode).
>
>
> On Fri, Aug 25, 2017 at 7:41 PM, Hugo Reinwald 
> wrote:
>
>> Hello,
>>
>> I am implementing a spark streaming solution with Kafka and read that
>> checkpoints cannot be used across application code changes - here
>> 
>>
>> I tested changes in application code and got the error message as b below
>> -
>>
>> 17/08/25 15:10:47 WARN CheckpointReader: Error reading checkpoint from
>> file file:/tmp/checkpoint/checkpoint-150364116.bk
>> java.io.InvalidClassException: scala.collection.mutable.ArrayBuffer;
>> local class incompatible: stream classdesc serialVersionUID =
>> -2927962711774871866, local class serialVersionUID = 1529165946227428979
>>
>> While I understand that this is as per design, can I know why does
>> checkpointing work the way that it does verifying the class signatures?
>> Would it not be easier to let the developer decide if he/she wants to use
>> the old checkpoints depending on what is the change in application logic
>> e.g. changes in code unrelated to spark/kafka - Logging / conf changes etc
>>
>> This is first post in the group. Apologies if I am asking the question
>> again, I did a nabble search and it didnt throw up the answer.
>>
>> Thanks for the help.
>> Hugo
>>
>
>


Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread
Hi,

I'm working with Structured Streaming to process logs from kafka and use
watermark to handle late events. Currently the watermark is computed by (max
event time seen by the engine - late threshold), and the same watermark is
used for all partitions.

But in production environment it happens frequently that different
partition is consumed at different speed, the consumption of some
partitions may be left behind, so the newest event time in these partitions
may be much smaller than than the others'. In this case using the same
watermark for all partitions may cause heavy data loss.

So is there any way to achieve different watermark for different kafka
partition or any plan to work on this?


Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread
Hi,

I'm working with Structured Streaming to process logs from kafka and use
watermark to handle late events. Currently the watermark is computed by (max
event time seen by the engine - late threshold), and the same watermark is
used for all partitions.

But in production environment it happens frequently that different
partition is consumed at different speed, the consumption of some
partitions may be left behind, so the newest event time in these partitions
may be much smaller than than the others'. In this case using the same
watermark for all partitions may cause heavy data loss.

So is there any way to achieve different watermark for different kafka
partition or any plan to work on this?
-- 
Wanxin Zhang,
Master candidate,
National Lab for Parallel and Distributed Processing (PDL),
School of Computer Science,
National University of Defense Technology,
Changsha, Hunan, China