Re: Why do checkpoints work the way they do?

2017-08-29 Thread Tathagata Das
Hello,

This is an unfortunate design on my part when I was building DStreams :)

Fortunately, we learnt from our mistakes and built Structured Streaming the
correct way. Checkpointing in Structured Streaming stores only the progress
information (offsets, etc.), and the user can change their application code
(within certain constraints, of course) and still restart from checkpoints
(unlike DStreams). If you are just building out your streaming
applications, then I highly recommend you to try out Structured Streaming
instead of DStreams (which is effectively in maintenance mode).


On Fri, Aug 25, 2017 at 7:41 PM, Hugo Reinwald 
wrote:

> Hello,
>
> I am implementing a spark streaming solution with Kafka and read that
> checkpoints cannot be used across application code changes - here
> 
>
> I tested changes in application code and got the error message as b below
> -
>
> 17/08/25 15:10:47 WARN CheckpointReader: Error reading checkpoint from
> file file:/tmp/checkpoint/checkpoint-150364116.bk
> java.io.InvalidClassException: scala.collection.mutable.ArrayBuffer;
> local class incompatible: stream classdesc serialVersionUID =
> -2927962711774871866, local class serialVersionUID = 1529165946227428979
>
> While I understand that this is as per design, can I know why does
> checkpointing work the way that it does verifying the class signatures?
> Would it not be easier to let the developer decide if he/she wants to use
> the old checkpoints depending on what is the change in application logic
> e.g. changes in code unrelated to spark/kafka - Logging / conf changes etc
>
> This is first post in the group. Apologies if I am asking the question
> again, I did a nabble search and it didnt throw up the answer.
>
> Thanks for the help.
> Hugo
>


Re: Time window on Processing Time

2017-08-29 Thread Tathagata Das
Yes, it can be! There is a sql function called current_timestamp() which is
self-explanatory. So I believe you should be able to do something like

import org.apache.spark.sql.functions._

ds.withColumn("processingTime", current_timestamp())
  .groupBy(window("processingTime", "1 minute"))
  .count()


On Mon, Aug 28, 2017 at 5:46 AM, madhu phatak  wrote:

> Hi,
> As I am playing with structured streaming, I observed that window function
> always requires a time column in input data.So that means it's event time.
>
> Is it possible to old spark streaming style window function based on
> processing time. I don't see any documentation on the same.
>
> --
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Aah, I might have misinterpreted. The groupBy + window solution would give
the max time for each train over 24 hours (non-overlapping window) of event
data (timestamped by activity_timestamp). So the output would be like.

Train Dest   Window(activity_timestamp)max(Time)
1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
currently through aug29
1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating as
no new updates coming in with activity_timestamp in this range.

The drawback of this approach is that as soon as Aug28 starts, you have
wait for new event about a train to get a new max(time). You may rather
want a rolling 24 hour period, that is, the max time known over events in
the last 24 hours.
Then maintaining our own custom state using
mapGroupsWithState/flatMapGroupsWithState()
is the best and most flexible option.
It is available in Spark 2.2 in Scala, Java.

Here is an example that tracks sessions based on events.
Scala -
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

You will have to create a custom per-train state which keeps track of last
24 hours of trains history, and use that state to calculate the max time
for each train.


def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
state: GroupState[TrainHistory]): Long = {
// for every event, update history (i.e. last 24 hours of events) and
return the max time from the history
}

trainTimesDataset // Dataset[TrainEvents]
  .groupByKey(_.train)
  .mapGroupsWithState(updateHistoryAndGetMax)

Hope this helps.


On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:

> Hey TD,
>
> If I understood the question correctly, your solution wouldn't return the
> exact solution, since it also groups by on destination. I would say the
> easiest solution would be to use flatMapGroupsWithState, where you:
> .groupByKey(_.train)
>
> and keep in state the row with the maximum time.
>
> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Yes. And in that case, if you just care about only the last few days of
>> max, then you should set watermark on the timestamp column.
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>> "dest")*
>> *  .max("time")*
>>
>> Any counts which are more than 5 days old will be dropped from the
>> streaming state.
>>
>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>>
>>> Hi,
>>>
>>> Thanks for the response. Since this is a streaming based query and in my
>>> case I need to hold state for 24 hours which I forgot to mention in my
>>> previous email. can I do ?
>>>
>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>>> hours"), "train", "dest").max("time")*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
 Int, dest: String, time: Timestamp] *


 *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*


 *SQL*: *"select train, dest, max(time) from trainTimesView group by
 train, dest"*// after calling
 *trainTimesData.createOrReplaceTempView(trainTimesView)*


 On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
 wrote:

> Hi All,
>
> I am wondering what is the easiest and concise way to express the
> computation below in Spark Structured streaming given that it supports 
> both
> imperative and declarative styles?
> I am just trying to select rows that has max timestamp for each train?
> Instead of doing some sort of nested queries like we normally do in any
> relational database I am trying to see if I can leverage both imperative
> and declarative at the same time. If nested queries or join are not
> required then I would like to see how this can be possible? I am using
> spark 2.1.1.
>
> Dataset
>
> TrainDest  Time1HK10:001SH12:001  
>   SZ14:002HK13:002SH09:002
> SZ07:00
>
> The desired result should be:
>
> TrainDest  Time1SZ14:002HK13:00
>
>

>>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Can I do sub queries using DataSet or DataFrame API's but not raw sql
unless it is a final resort?

On Tue, Aug 29, 2017 at 8:47 PM, ayan guha  wrote:

> This is not correct. In SQL Land, your query should be like below:
>
> select * from (
> select Train,DEST,TIME, row_number() over (partition by train order by
> time desc) r
> from TrainTable
> ) x where r=1
>
> All the constructs supported in dataframe functions.
>
> On Wed, Aug 30, 2017 at 1:08 PM, kant kodali  wrote:
>
>> yes in a relational db one could just do this
>>
>> SELECT t.Train, t.Dest, r.MaxTimeFROM (
>>   SELECT Train, MAX(Time) as MaxTime
>>   FROM TrainTable
>>   GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND 
>> t.Time = r.MaxTime
>>
>> (copied answer from a Stack overflow question someone asked)
>>
>> but still thinking how to do this in a streaming setting?
>> mapGroupWithState looks interesting but its just not available in 2.1.1 If
>> I am not wrong.
>>
>> Thanks!
>>
>> On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz  wrote:
>>
>>> That just gives you the max time for each train. If I understood the
>>> question correctly, OP wants the whole row with the max time. That's
>>> generally solved through joins or subqueries, which would be hard to do in
>>> a streaming setting
>>>
>>> On Aug 29, 2017 7:29 PM, "ayan guha"  wrote:
>>>
 Why removing the destination from the window wont work? Like this:

  *trainTimesDataset*
 *  .withWatermark("**activity_timestamp", "5 days")*
 *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
 "train")*
 *  .max("time")*

 On Wed, Aug 30, 2017 at 10:38 AM, kant kodali 
 wrote:

> @Burak so how would the transformation or query would look like for
> the above example? I don't see flatMapGroupsWithState in the DataSet
> API Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life
> easier.
>
>
>
> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>
>> Hey TD,
>>
>> If I understood the question correctly, your solution wouldn't return
>> the exact solution, since it also groups by on destination. I would say 
>> the
>> easiest solution would be to use flatMapGroupsWithState, where you:
>> .groupByKey(_.train)
>>
>> and keep in state the row with the maximum time.
>>
>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes. And in that case, if you just care about only the last few days
>>> of max, then you should set watermark on the timestamp column.
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>> "train", "dest")*
>>> *  .max("time")*
>>>
>>> Any counts which are more than 5 days old will be dropped from the
>>> streaming state.
>>>
>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
>>> wrote:
>>>
 Hi,

 Thanks for the response. Since this is a streaming based query and
 in my case I need to hold state for 24 hours which I forgot to mention 
 in
 my previous email. can I do ?

  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
 "24 hours"), "train", "dest").max("time")*


 On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
> Int, dest: String, time: Timestamp] *
>
>
> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>
>
> *SQL*: *"select train, dest, max(time) from trainTimesView group
> by train, dest"*// after calling
> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>
>
> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I am wondering what is the easiest and concise way to express the
>> computation below in Spark Structured streaming given that it 
>> supports both
>> imperative and declarative styles?
>> I am just trying to select rows that has max timestamp for each
>> train? Instead of doing some sort of nested queries like we normally 
>> do in
>> any relational database I am trying to see if I can leverage both
>> imperative and declarative at the same time. If nested queries or 
>> join are
>> not required then I would like to see how this can be possible? I am 
>> using
>> spark 2.1.1.
>>
>> Dataset
>>

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread ayan guha
This is not correct. In SQL Land, your query should be like below:

select * from (
select Train,DEST,TIME, row_number() over (partition by train order by time
desc) r
from TrainTable
) x where r=1

All the constructs supported in dataframe functions.

On Wed, Aug 30, 2017 at 1:08 PM, kant kodali  wrote:

> yes in a relational db one could just do this
>
> SELECT t.Train, t.Dest, r.MaxTimeFROM (
>   SELECT Train, MAX(Time) as MaxTime
>   FROM TrainTable
>   GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND t.Time 
> = r.MaxTime
>
> (copied answer from a Stack overflow question someone asked)
>
> but still thinking how to do this in a streaming setting?
> mapGroupWithState looks interesting but its just not available in 2.1.1 If
> I am not wrong.
>
> Thanks!
>
> On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz  wrote:
>
>> That just gives you the max time for each train. If I understood the
>> question correctly, OP wants the whole row with the max time. That's
>> generally solved through joins or subqueries, which would be hard to do in
>> a streaming setting
>>
>> On Aug 29, 2017 7:29 PM, "ayan guha"  wrote:
>>
>>> Why removing the destination from the window wont work? Like this:
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
>>> *  .max("time")*
>>>
>>> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali 
>>> wrote:
>>>
 @Burak so how would the transformation or query would look like for the
 above example? I don't see flatMapGroupsWithState in the DataSet API
 Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.



 On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:

> Hey TD,
>
> If I understood the question correctly, your solution wouldn't return
> the exact solution, since it also groups by on destination. I would say 
> the
> easiest solution would be to use flatMapGroupsWithState, where you:
> .groupByKey(_.train)
>
> and keep in state the row with the maximum time.
>
> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Yes. And in that case, if you just care about only the last few days
>> of max, then you should set watermark on the timestamp column.
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>> "train", "dest")*
>> *  .max("time")*
>>
>> Any counts which are more than 5 days old will be dropped from the
>> streaming state.
>>
>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
>> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the response. Since this is a streaming based query and
>>> in my case I need to hold state for 24 hours which I forgot to mention 
>>> in
>>> my previous email. can I do ?
>>>
>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>>> "24 hours"), "train", "dest").max("time")*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
 Int, dest: String, time: Timestamp] *


 *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*


 *SQL*: *"select train, dest, max(time) from trainTimesView group
 by train, dest"*// after calling
 *trainTimesData.createOrReplaceTempView(trainTimesView)*


 On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
 wrote:

> Hi All,
>
> I am wondering what is the easiest and concise way to express the
> computation below in Spark Structured streaming given that it 
> supports both
> imperative and declarative styles?
> I am just trying to select rows that has max timestamp for each
> train? Instead of doing some sort of nested queries like we normally 
> do in
> any relational database I am trying to see if I can leverage both
> imperative and declarative at the same time. If nested queries or 
> join are
> not required then I would like to see how this can be possible? I am 
> using
> spark 2.1.1.
>
> Dataset
>
> TrainDest  Time1HK10:001SH
> 12:001SZ14:002HK13:002SH  
>   09:002SZ07:00
>
> The desired result should be:
>
> TrainDest  Time1SZ14:002 

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
yes in a relational db one could just do this

SELECT t.Train, t.Dest, r.MaxTimeFROM (
  SELECT Train, MAX(Time) as MaxTime
  FROM TrainTable
  GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND
t.Time = r.MaxTime

(copied answer from a Stack overflow question someone asked)

but still thinking how to do this in a streaming setting? mapGroupWithState
looks interesting but its just not available in 2.1.1 If I am not wrong.

Thanks!

On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz  wrote:

> That just gives you the max time for each train. If I understood the
> question correctly, OP wants the whole row with the max time. That's
> generally solved through joins or subqueries, which would be hard to do in
> a streaming setting
>
> On Aug 29, 2017 7:29 PM, "ayan guha"  wrote:
>
>> Why removing the destination from the window wont work? Like this:
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
>> *  .max("time")*
>>
>> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali  wrote:
>>
>>> @Burak so how would the transformation or query would look like for the
>>> above example? I don't see flatMapGroupsWithState in the DataSet API
>>> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>>>
>>>
>>>
>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>>>
 Hey TD,

 If I understood the question correctly, your solution wouldn't return
 the exact solution, since it also groups by on destination. I would say the
 easiest solution would be to use flatMapGroupsWithState, where you:
 .groupByKey(_.train)

 and keep in state the row with the maximum time.

 On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Yes. And in that case, if you just care about only the last few days
> of max, then you should set watermark on the timestamp column.
>
>  *trainTimesDataset*
> *  .withWatermark("**activity_timestamp", "5 days")*
> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
> "train", "dest")*
> *  .max("time")*
>
> Any counts which are more than 5 days old will be dropped from the
> streaming state.
>
> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
> wrote:
>
>> Hi,
>>
>> Thanks for the response. Since this is a streaming based query and in
>> my case I need to hold state for 24 hours which I forgot to mention in my
>> previous email. can I do ?
>>
>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>> "24 hours"), "train", "dest").max("time")*
>>
>>
>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>> Int, dest: String, time: Timestamp] *
>>>
>>>
>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>
>>>
>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>> train, dest"*// after calling
>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I am wondering what is the easiest and concise way to express the
 computation below in Spark Structured streaming given that it supports 
 both
 imperative and declarative styles?
 I am just trying to select rows that has max timestamp for each
 train? Instead of doing some sort of nested queries like we normally 
 do in
 any relational database I am trying to see if I can leverage both
 imperative and declarative at the same time. If nested queries or join 
 are
 not required then I would like to see how this can be possible? I am 
 using
 spark 2.1.1.

 Dataset

 TrainDest  Time1HK10:001SH
 12:001SZ14:002HK13:002SH   
  09:002SZ07:00

 The desired result should be:

 TrainDest  Time1SZ14:002HK13:00


>>>
>>
>

>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
That just gives you the max time for each train. If I understood the
question correctly, OP wants the whole row with the max time. That's
generally solved through joins or subqueries, which would be hard to do in
a streaming setting

On Aug 29, 2017 7:29 PM, "ayan guha"  wrote:

> Why removing the destination from the window wont work? Like this:
>
>  *trainTimesDataset*
> *  .withWatermark("**activity_timestamp", "5 days")*
> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
> *  .max("time")*
>
> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali  wrote:
>
>> @Burak so how would the transformation or query would look like for the
>> above example? I don't see flatMapGroupsWithState in the DataSet API
>> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>>
>>
>>
>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>>
>>> Hey TD,
>>>
>>> If I understood the question correctly, your solution wouldn't return
>>> the exact solution, since it also groups by on destination. I would say the
>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>> .groupByKey(_.train)
>>>
>>> and keep in state the row with the maximum time.
>>>
>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Yes. And in that case, if you just care about only the last few days of
 max, then you should set watermark on the timestamp column.

  *trainTimesDataset*
 *  .withWatermark("**activity_timestamp", "5 days")*
 *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
 "train", "dest")*
 *  .max("time")*

 Any counts which are more than 5 days old will be dropped from the
 streaming state.

 On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
 wrote:

> Hi,
>
> Thanks for the response. Since this is a streaming based query and in
> my case I need to hold state for 24 hours which I forgot to mention in my
> previous email. can I do ?
>
>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
> "24 hours"), "train", "dest").max("time")*
>
>
> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>> Int, dest: String, time: Timestamp] *
>>
>>
>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>
>>
>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>> train, dest"*// after calling
>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>
>>
>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am wondering what is the easiest and concise way to express the
>>> computation below in Spark Structured streaming given that it supports 
>>> both
>>> imperative and declarative styles?
>>> I am just trying to select rows that has max timestamp for each
>>> train? Instead of doing some sort of nested queries like we normally do 
>>> in
>>> any relational database I am trying to see if I can leverage both
>>> imperative and declarative at the same time. If nested queries or join 
>>> are
>>> not required then I would like to see how this can be possible? I am 
>>> using
>>> spark 2.1.1.
>>>
>>> Dataset
>>>
>>> TrainDest  Time1HK10:001SH
>>> 12:001SZ14:002HK13:002SH
>>> 09:002SZ07:00
>>>
>>> The desired result should be:
>>>
>>> TrainDest  Time1SZ14:002HK13:00
>>>
>>>
>>
>

>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
@ayan,

Thanks for your response

I would like to have functions in this case  calculateIncome and the reason
why I need function is to reuse in other parts of the application ..that's
the reason I'm planning for mapgroups with function as argument which takes
rowiterator ..but not sure if this is the best to implement as my initial
dataframe is very large

On Tue, Aug 29, 2017 at 10:24 PM ayan guha  wrote:

> Hi
>
> the tool you are looking for is window function.  Example:
>
> >>> df.show()
> +++---+--+-+
> |JoinDate|dept| id|income|income_age_ts|
> +++---+--+-+
> | 4/20/13|  ES|101| 19000|  4/20/17|
> | 4/20/13|  OS|101| 1|  10/3/15|
> | 4/20/12|  DS|102| 13000|   5/9/17|
> | 4/20/12|  CS|102| 12000|   5/8/17|
> | 4/20/10|  EQ|103| 1|   5/9/17|
> | 4/20/10|  MD|103|  9000|   5/8/17|
> +++---+--+-+
>
> >>> res = spark.sql("select *, row_number() over (partition by id order by
> income_age_ts desc) r from t")
> >>> res.show()
> +++---+--+-+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +++---+--+-+---+
> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
> | 4/20/10|  MD|103|  9000|   5/8/17|  2|
> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
> | 4/20/13|  OS|101| 1|  10/3/15|  2|
> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
> | 4/20/12|  CS|102| 12000|   5/8/17|  2|
> +++---+--+-+---+
>
> >>> res = spark.sql("select * from (select *, row_number() over (partition
> by id order by income_age_ts desc) r from t) x where r=1")
> >>> res.show()
> +++---+--+-+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +++---+--+-+---+
> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
> +++---+--+-+---+
>
> This should be better because it uses all in-built optimizations in Spark.
>
> Best
> Ayan
>
> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep 
> wrote:
>
>> Please click on unnamed text/html  link for better view
>>
>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep 
>> wrote:
>>
>>>
>>> -- Forwarded message -
>>> From: Mamillapalli, Purna Pradeep <
>>> purnapradeep.mamillapa...@capitalone.com>
>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>> Subject: Spark question
>>> To: purna pradeep 
>>>
>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>
>>>
>>>
>>> EmployeeID
>>>
>>> INCOME
>>>
>>> INCOME AGE TS
>>>
>>> JoinDate
>>>
>>> Dept
>>>
>>> 101
>>>
>>> 19000
>>>
>>> 4/20/17
>>>
>>> 4/20/13
>>>
>>> ES
>>>
>>> 101
>>>
>>> 1
>>>
>>> 10/3/15
>>>
>>> 4/20/13
>>>
>>> OS
>>>
>>> 102
>>>
>>> 13000
>>>
>>> 5/9/17
>>>
>>> 4/20/12
>>>
>>> DS
>>>
>>> 102
>>>
>>> 12000
>>>
>>> 5/8/17
>>>
>>> 4/20/12
>>>
>>> CS
>>>
>>> 103
>>>
>>> 1
>>>
>>> 5/9/17
>>>
>>> 4/20/10
>>>
>>> EQ
>>>
>>> 103
>>>
>>> 9000
>>>
>>> 5/8/15
>>>
>>> 4/20/10
>>>
>>> MD
>>>
>>> Get the latest income of an employee which has  Income_age ts <10 months
>>>
>>> Expected output Dataframe
>>>
>>> EmployeeID
>>>
>>> INCOME
>>>
>>> INCOME AGE TS
>>>
>>> JoinDate
>>>
>>> Dept
>>>
>>> 101
>>>
>>> 19000
>>>
>>> 4/20/17
>>>
>>> 4/20/13
>>>
>>> ES
>>>
>>> 102
>>>
>>> 13000
>>>
>>> 5/9/17
>>>
>>> 4/20/12
>>>
>>> DS
>>>
>>> 103
>>>
>>> 1
>>>
>>> 5/9/17
>>>
>>> 4/20/10
>>>
>>> EQ
>>>
>>>
>>>
>>
>>
>>
>>
>>
>> Below is what im planning to implement
>>>
>>>
>>>
>>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
>>> *JOINDATE*: Int,DEPT:String)
>>>
>>>
>>>
>>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
>>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
>>> *"Date"*). add(*"DEPT"*,*"String"*)
>>>
>>>
>>>
>>> *//Reading from the File **import *sparkSession.implicits._
>>>
>>> *val *readEmpFile = sparkSession.read
>>>   .option(*"sep"*, *","*)
>>>   .schema(empSchema)
>>>   .csv(INPUT_DIRECTORY)
>>>
>>>
>>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>>>
>>>
>>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
>>> EmployeeID*)
>>>
>>>
>>> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>>>
>>>
>>>
>>>
>>>
>>> *def *performETL(empData: Iterator[employee]) : new employee  = {
>>>
>>>   *val *empList = empData.toList
>>>
>>>
>>> *//calculate income has Logic to figureout latest income for an account
>>> and returns latest income   val *income = calculateIncome(empList)
>>>
>>>
>>>   *for *(i <- empList) {
>>>
>>>   *val *row = i
>>>
>>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>>>   }
>>>   *return  "Done"*
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>> Is this a better approach or even the right approach to 

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread ayan guha
Why removing the destination from the window wont work? Like this:

 *trainTimesDataset*
*  .withWatermark("**activity_timestamp", "5 days")*
*  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
*  .max("time")*

On Wed, Aug 30, 2017 at 10:38 AM, kant kodali  wrote:

> @Burak so how would the transformation or query would look like for the
> above example? I don't see flatMapGroupsWithState in the DataSet API
> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>
>
>
> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>
>> Hey TD,
>>
>> If I understood the question correctly, your solution wouldn't return the
>> exact solution, since it also groups by on destination. I would say the
>> easiest solution would be to use flatMapGroupsWithState, where you:
>> .groupByKey(_.train)
>>
>> and keep in state the row with the maximum time.
>>
>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes. And in that case, if you just care about only the last few days of
>>> max, then you should set watermark on the timestamp column.
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>>> "dest")*
>>> *  .max("time")*
>>>
>>> Any counts which are more than 5 days old will be dropped from the
>>> streaming state.
>>>
>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>>>
 Hi,

 Thanks for the response. Since this is a streaming based query and in
 my case I need to hold state for 24 hours which I forgot to mention in my
 previous email. can I do ?

  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
 hours"), "train", "dest").max("time")*


 On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
> Int, dest: String, time: Timestamp] *
>
>
> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>
>
> *SQL*: *"select train, dest, max(time) from trainTimesView group by
> train, dest"*// after calling
> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>
>
> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I am wondering what is the easiest and concise way to express the
>> computation below in Spark Structured streaming given that it supports 
>> both
>> imperative and declarative styles?
>> I am just trying to select rows that has max timestamp for each
>> train? Instead of doing some sort of nested queries like we normally do 
>> in
>> any relational database I am trying to see if I can leverage both
>> imperative and declarative at the same time. If nested queries or join 
>> are
>> not required then I would like to see how this can be possible? I am 
>> using
>> spark 2.1.1.
>>
>> Dataset
>>
>> TrainDest  Time1HK10:001SH12:001 
>>SZ14:002HK13:002SH09:002  
>>   SZ07:00
>>
>> The desired result should be:
>>
>> TrainDest  Time1SZ14:002HK13:00
>>
>>
>

>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread ayan guha
Hi

the tool you are looking for is window function.  Example:

>>> df.show()
+++---+--+-+
|JoinDate|dept| id|income|income_age_ts|
+++---+--+-+
| 4/20/13|  ES|101| 19000|  4/20/17|
| 4/20/13|  OS|101| 1|  10/3/15|
| 4/20/12|  DS|102| 13000|   5/9/17|
| 4/20/12|  CS|102| 12000|   5/8/17|
| 4/20/10|  EQ|103| 1|   5/9/17|
| 4/20/10|  MD|103|  9000|   5/8/17|
+++---+--+-+

>>> res = spark.sql("select *, row_number() over (partition by id order by
income_age_ts desc) r from t")
>>> res.show()
+++---+--+-+---+
|JoinDate|dept| id|income|income_age_ts|  r|
+++---+--+-+---+
| 4/20/10|  EQ|103| 1|   5/9/17|  1|
| 4/20/10|  MD|103|  9000|   5/8/17|  2|
| 4/20/13|  ES|101| 19000|  4/20/17|  1|
| 4/20/13|  OS|101| 1|  10/3/15|  2|
| 4/20/12|  DS|102| 13000|   5/9/17|  1|
| 4/20/12|  CS|102| 12000|   5/8/17|  2|
+++---+--+-+---+

>>> res = spark.sql("select * from (select *, row_number() over (partition
by id order by income_age_ts desc) r from t) x where r=1")
>>> res.show()
+++---+--+-+---+
|JoinDate|dept| id|income|income_age_ts|  r|
+++---+--+-+---+
| 4/20/10|  EQ|103| 1|   5/9/17|  1|
| 4/20/13|  ES|101| 19000|  4/20/17|  1|
| 4/20/12|  DS|102| 13000|   5/9/17|  1|
+++---+--+-+---+

This should be better because it uses all in-built optimizations in Spark.

Best
Ayan

On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep 
wrote:

> Please click on unnamed text/html  link for better view
>
> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep 
> wrote:
>
>>
>> -- Forwarded message -
>> From: Mamillapalli, Purna Pradeep > capitalone.com>
>> Date: Tue, Aug 29, 2017 at 8:08 PM
>> Subject: Spark question
>> To: purna pradeep 
>>
>> Below is the input Dataframe(In real this is a very large Dataframe)
>>
>>
>>
>> EmployeeID
>>
>> INCOME
>>
>> INCOME AGE TS
>>
>> JoinDate
>>
>> Dept
>>
>> 101
>>
>> 19000
>>
>> 4/20/17
>>
>> 4/20/13
>>
>> ES
>>
>> 101
>>
>> 1
>>
>> 10/3/15
>>
>> 4/20/13
>>
>> OS
>>
>> 102
>>
>> 13000
>>
>> 5/9/17
>>
>> 4/20/12
>>
>> DS
>>
>> 102
>>
>> 12000
>>
>> 5/8/17
>>
>> 4/20/12
>>
>> CS
>>
>> 103
>>
>> 1
>>
>> 5/9/17
>>
>> 4/20/10
>>
>> EQ
>>
>> 103
>>
>> 9000
>>
>> 5/8/15
>>
>> 4/20/10
>>
>> MD
>>
>> Get the latest income of an employee which has  Income_age ts <10 months
>>
>> Expected output Dataframe
>>
>> EmployeeID
>>
>> INCOME
>>
>> INCOME AGE TS
>>
>> JoinDate
>>
>> Dept
>>
>> 101
>>
>> 19000
>>
>> 4/20/17
>>
>> 4/20/13
>>
>> ES
>>
>> 102
>>
>> 13000
>>
>> 5/9/17
>>
>> 4/20/12
>>
>> DS
>>
>> 103
>>
>> 1
>>
>> 5/9/17
>>
>> 4/20/10
>>
>> EQ
>>
>>
>>
>
>
>
>
>
> Below is what im planning to implement
>>
>>
>>
>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
>> *JOINDATE*: Int,DEPT:String)
>>
>>
>>
>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
>> *"Date"*). add(*"DEPT"*,*"String"*)
>>
>>
>>
>> *//Reading from the File **import *sparkSession.implicits._
>>
>> *val *readEmpFile = sparkSession.read
>>   .option(*"sep"*, *","*)
>>   .schema(empSchema)
>>   .csv(INPUT_DIRECTORY)
>>
>>
>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>>
>>
>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
>> EmployeeID*)
>>
>>
>> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>>
>>
>>
>>
>>
>> *def *performETL(empData: Iterator[employee]) : new employee  = {
>>
>>   *val *empList = empData.toList
>>
>>
>> *//calculate income has Logic to figureout latest income for an account
>> and returns latest income   val *income = calculateIncome(empList)
>>
>>
>>   *for *(i <- empList) {
>>
>>   *val *row = i
>>
>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>>   }
>>   *return  "Done"*
>>
>>
>>
>> }
>>
>>
>>
>> Is this a better approach or even the right approach to implement the
>> same.If not please suggest a better way to implement the same?
>>
>>
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> 

[Upvote] for Apache Spark for 2017 Innovation Award

2017-08-29 Thread Jules Damji

Fellow Spark users,

If you think, and believe, deep in your hearts that Apache Spark deserves an 
innovation award, cast your vote here: https://jaxlondon.com/jax-awards

Cheers, 
Jules 

Sent from my iPhone
Pardon the dumb thumb typos :)

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
Please click on unnamed text/html  link for better view

On Tue, Aug 29, 2017 at 8:11 PM purna pradeep 
wrote:

>
> -- Forwarded message -
> From: Mamillapalli, Purna Pradeep <
> purnapradeep.mamillapa...@capitalone.com>
> Date: Tue, Aug 29, 2017 at 8:08 PM
> Subject: Spark question
> To: purna pradeep 
>
> Below is the input Dataframe(In real this is a very large Dataframe)
>
>
>
> EmployeeID
>
> INCOME
>
> INCOME AGE TS
>
> JoinDate
>
> Dept
>
> 101
>
> 19000
>
> 4/20/17
>
> 4/20/13
>
> ES
>
> 101
>
> 1
>
> 10/3/15
>
> 4/20/13
>
> OS
>
> 102
>
> 13000
>
> 5/9/17
>
> 4/20/12
>
> DS
>
> 102
>
> 12000
>
> 5/8/17
>
> 4/20/12
>
> CS
>
> 103
>
> 1
>
> 5/9/17
>
> 4/20/10
>
> EQ
>
> 103
>
> 9000
>
> 5/8/15
>
> 4/20/10
>
> MD
>
> Get the latest income of an employee which has  Income_age ts <10 months
>
> Expected output Dataframe
>
> EmployeeID
>
> INCOME
>
> INCOME AGE TS
>
> JoinDate
>
> Dept
>
> 101
>
> 19000
>
> 4/20/17
>
> 4/20/13
>
> ES
>
> 102
>
> 13000
>
> 5/9/17
>
> 4/20/12
>
> DS
>
> 103
>
> 1
>
> 5/9/17
>
> 4/20/10
>
> EQ
>
>
>





Below is what im planning to implement
>
>
>
> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
> *JOINDATE*: Int,DEPT:String)
>
>
>
> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
> *"Date"*). add(*"DEPT"*,*"String"*)
>
>
>
> *//Reading from the File **import *sparkSession.implicits._
>
> *val *readEmpFile = sparkSession.read
>   .option(*"sep"*, *","*)
>   .schema(empSchema)
>   .csv(INPUT_DIRECTORY)
>
>
> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>
>
> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
> EmployeeID*)
>
>
> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>
>
>
>
>
> *def *performETL(empData: Iterator[employee]) : new employee  = {
>
>   *val *empList = empData.toList
>
>
> *//calculate income has Logic to figureout latest income for an account
> and returns latest income   val *income = calculateIncome(empList)
>
>
>   *for *(i <- empList) {
>
>   *val *row = i
>
> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>   }
>   *return  "Done"*
>
>
>
> }
>
>
>
> Is this a better approach or even the right approach to implement the
> same.If not please suggest a better way to implement the same?
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
@Burak so how would the transformation or query would look like for the
above example? I don't see flatMapGroupsWithState in the DataSet API Spark
2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.



On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:

> Hey TD,
>
> If I understood the question correctly, your solution wouldn't return the
> exact solution, since it also groups by on destination. I would say the
> easiest solution would be to use flatMapGroupsWithState, where you:
> .groupByKey(_.train)
>
> and keep in state the row with the maximum time.
>
> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Yes. And in that case, if you just care about only the last few days of
>> max, then you should set watermark on the timestamp column.
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>> "dest")*
>> *  .max("time")*
>>
>> Any counts which are more than 5 days old will be dropped from the
>> streaming state.
>>
>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>>
>>> Hi,
>>>
>>> Thanks for the response. Since this is a streaming based query and in my
>>> case I need to hold state for 24 hours which I forgot to mention in my
>>> previous email. can I do ?
>>>
>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>>> hours"), "train", "dest").max("time")*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
 Int, dest: String, time: Timestamp] *


 *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*


 *SQL*: *"select train, dest, max(time) from trainTimesView group by
 train, dest"*// after calling
 *trainTimesData.createOrReplaceTempView(trainTimesView)*


 On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
 wrote:

> Hi All,
>
> I am wondering what is the easiest and concise way to express the
> computation below in Spark Structured streaming given that it supports 
> both
> imperative and declarative styles?
> I am just trying to select rows that has max timestamp for each train?
> Instead of doing some sort of nested queries like we normally do in any
> relational database I am trying to see if I can leverage both imperative
> and declarative at the same time. If nested queries or join are not
> required then I would like to see how this can be possible? I am using
> spark 2.1.1.
>
> Dataset
>
> TrainDest  Time1HK10:001SH12:001  
>   SZ14:002HK13:002SH09:002
> SZ07:00
>
> The desired result should be:
>
> TrainDest  Time1SZ14:002HK13:00
>
>

>>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
Hey TD,

If I understood the question correctly, your solution wouldn't return the
exact solution, since it also groups by on destination. I would say the
easiest solution would be to use flatMapGroupsWithState, where you:
.groupByKey(_.train)

and keep in state the row with the maximum time.

On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das 
wrote:

> Yes. And in that case, if you just care about only the last few days of
> max, then you should set watermark on the timestamp column.
>
>  *trainTimesDataset*
> *  .withWatermark("**activity_timestamp", "5 days")*
> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
> "dest")*
> *  .max("time")*
>
> Any counts which are more than 5 days old will be dropped from the
> streaming state.
>
> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>
>> Hi,
>>
>> Thanks for the response. Since this is a streaming based query and in my
>> case I need to hold state for 24 hours which I forgot to mention in my
>> previous email. can I do ?
>>
>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>> hours"), "train", "dest").max("time")*
>>
>>
>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>> Int, dest: String, time: Timestamp] *
>>>
>>>
>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>
>>>
>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>> train, dest"*// after calling
>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>
>>>
>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I am wondering what is the easiest and concise way to express the
 computation below in Spark Structured streaming given that it supports both
 imperative and declarative styles?
 I am just trying to select rows that has max timestamp for each train?
 Instead of doing some sort of nested queries like we normally do in any
 relational database I am trying to see if I can leverage both imperative
 and declarative at the same time. If nested queries or join are not
 required then I would like to see how this can be possible? I am using
 spark 2.1.1.

 Dataset

 TrainDest  Time1HK10:001SH12:001   
  SZ14:002HK13:002SH09:002  
   SZ07:00

 The desired result should be:

 TrainDest  Time1SZ14:002HK13:00


>>>
>>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Yes. And in that case, if you just care about only the last few days of
max, then you should set watermark on the timestamp column.

 *trainTimesDataset*
*  .withWatermark("**activity_timestamp", "5 days")*
*  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
"dest")*
*  .max("time")*

Any counts which are more than 5 days old will be dropped from the
streaming state.

On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:

> Hi,
>
> Thanks for the response. Since this is a streaming based query and in my
> case I need to hold state for 24 hours which I forgot to mention in my
> previous email. can I do ?
>
>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
> hours"), "train", "dest").max("time")*
>
>
> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>> Int, dest: String, time: Timestamp] *
>>
>>
>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>
>>
>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>> train, dest"*// after calling
>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>
>>
>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am wondering what is the easiest and concise way to express the
>>> computation below in Spark Structured streaming given that it supports both
>>> imperative and declarative styles?
>>> I am just trying to select rows that has max timestamp for each train?
>>> Instead of doing some sort of nested queries like we normally do in any
>>> relational database I am trying to see if I can leverage both imperative
>>> and declarative at the same time. If nested queries or join are not
>>> required then I would like to see how this can be possible? I am using
>>> spark 2.1.1.
>>>
>>> Dataset
>>>
>>> TrainDest  Time1HK10:001SH12:001
>>> SZ14:002HK13:002SH09:002
>>> SZ07:00
>>>
>>> The desired result should be:
>>>
>>> TrainDest  Time1SZ14:002HK13:00
>>>
>>>
>>
>


Re: use WithColumn with external function in a java jar

2017-08-29 Thread purna pradeep
Thanks, I'll check it out.

On Mon, Aug 28, 2017 at 10:22 PM Praneeth Gayam 
wrote:

> You can create a UDF which will invoke your java lib
>
> def calculateExpense: UserDefinedFunction = udf((pexpense: String, cexpense: 
> String) => new MyJava().calculateExpense(pexpense.toDouble, 
> cexpense.toDouble))
>
>
>
>
>
> On Tue, Aug 29, 2017 at 6:53 AM, purna pradeep 
> wrote:
>
>> I have data in a DataFrame with below columns
>>
>> 1)Fileformat is csv
>> 2)All below column datatypes are String
>>
>> employeeid,pexpense,cexpense
>>
>> Now I need to create a new DataFrame which has new column called
>> `expense`, which is calculated based on columns `pexpense`, `cexpense`.
>>
>> The tricky part is the calculation algorithm is not an **UDF** function
>> which I created, but it's an external function that needs to be imported
>> from a Java library which takes primitive types as arguments - in this case
>> `pexpense`, `cexpense` - to calculate the value required for new column.
>>
>> The external function signature
>>
>> public class MyJava
>>
>> {
>>
>> public Double calculateExpense(Double pexpense, Double cexpense) {
>>// calculation
>> }
>>
>> }
>>
>> So how can I invoke that external function to create a new calculated
>> column. Can I register that external function as UDF in my Spark
>> application?
>>
>> Stackoverflow reference
>>
>>
>> https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function
>>
>>
>>
>>
>>
>>
>


Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
-- Forwarded message -
From: Mamillapalli, Purna Pradeep 
Date: Tue, Aug 29, 2017 at 8:08 PM
Subject: Spark question
To: purna pradeep 

Below is the input Dataframe(In real this is a very large Dataframe)



EmployeeID

INCOME

INCOME AGE TS

JoinDate

Dept

101

19000

4/20/17

4/20/13

ES

101

1

10/3/15

4/20/13

OS

102

13000

5/9/17

4/20/12

DS

102

12000

5/8/17

4/20/12

CS

103

1

5/9/17

4/20/10

EQ

103

9000

5/8/15

4/20/10

MD

Get the latest income of an employee which has  Income_age ts <10 months

Expected output Dataframe

EmployeeID

INCOME

INCOME AGE TS

JoinDate

Dept

101

19000

4/20/17

4/20/13

ES

102

13000

5/9/17

4/20/12

DS

103

1

5/9/17

4/20/10

EQ


Below is what im planning to implement



case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
*JOINDATE*: Int,DEPT:String)



*val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
*"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,*"Date"*).
add(*"DEPT"*,*"String"*)



*//Reading from the File **import *sparkSession.implicits._

*val *readEmpFile = sparkSession.read
  .option(*"sep"*, *","*)
  .schema(empSchema)
  .csv(INPUT_DIRECTORY)


*//Create employee DataFrame **val *custDf = readEmpFile.as[employee]


*//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
EmployeeID*)


*val *k = groupByDf.mapGroups((key,value) => performETL(value))





*def *performETL(empData: Iterator[employee]) : new employee  = {

  *val *empList = empData.toList


*//calculate income has Logic to figureout latest income for an account and
returns latest income   val *income = calculateIncome(empList)


  *for *(i <- empList) {

  *val *row = i

*return new *employee(row.EmployeeID, row.INCOMEAGE , income)
  }
  *return  "Done"*



}



Is this a better approach or even the right approach to implement the
same.If not please suggest a better way to implement the same?



--

The information contained in this e-mail is confidential and/or proprietary
to Capital One and/or its affiliates and may only be used solely in
performance of work or services for Capital One. The information
transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended
recipient, you are hereby notified that any review, retransmission,
dissemination, distribution, copying or other use of, or taking of any
action in reliance upon this information is strictly prohibited. If you
have received this communication in error, please contact the sender and
delete the material from your computer.


Spark2 create hive external table

2017-08-29 Thread antoniosi
Hi,

I am trying to connect to spark thrift server to create an external table.
In my table DDL, I have a tbl property 'spark.sql.sources.provider' =
'parquet', but I am getting an error "Cannot persistent 
 into hive metastore as table property keys may not start with 'spark.sql.':
[spark.sql.sources.provider];

However, I try to create an external table in spark-shell using
spark.catalog.createExternalTable() api. When I look at the table definition
via beeline using "show create table", I saw these tblproperties:

| TBLPROPERTIES (   

  
|
|   'COLUMN_STATS_ACCURATE'='false',

  
|
|   'numFiles'='0', 

  
|
|   'numRows'='-1', 

  
|
|   'rawDataSize'='-1', 

  
|
|   'spark.sql.sources.provider'='parquet', 

  
|
|   'spark.sql.sources.schema.numParts'='1',
   

Can someone explain why the creating the external table via jdbc to the
spark thrift server complains about the spark.sql tbl properties?

Thanks.

Antonio.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark2-create-hive-external-table-tp29118.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Hi,

Thanks for the response. Since this is a streaming based query and in my
case I need to hold state for 24 hours which I forgot to mention in my
previous email. can I do ?

 *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
hours"), "train", "dest").max("time")*


On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das 
wrote:

> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int,
> dest: String, time: Timestamp] *
>
>
> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>
>
> *SQL*: *"select train, dest, max(time) from trainTimesView group by
> train, dest"*// after calling
> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>
>
> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I am wondering what is the easiest and concise way to express the
>> computation below in Spark Structured streaming given that it supports both
>> imperative and declarative styles?
>> I am just trying to select rows that has max timestamp for each train?
>> Instead of doing some sort of nested queries like we normally do in any
>> relational database I am trying to see if I can leverage both imperative
>> and declarative at the same time. If nested queries or join are not
>> required then I would like to see how this can be possible? I am using
>> spark 2.1.1.
>>
>> Dataset
>>
>> TrainDest  Time1HK10:001SH12:001 
>>SZ14:002HK13:002SH09:002
>> SZ07:00
>>
>> The desired result should be:
>>
>> TrainDest  Time1SZ14:002HK13:00
>>
>>
>


Re: Spark 2.0.0 and Hive metastore

2017-08-29 Thread Andrés Ivaldi
Every comment are welcome

If I´m not wrong it's because we are using percentile aggregation which
comes with Hive support, apart from that nothing else.


On Tue, Aug 29, 2017 at 11:23 AM, Jean Georges Perrin  wrote:

> Sorry if my comment is not helping, but... why do you need Hive? Can't you
> save your aggregation using parquet for example?
>
> jg
>
>
> > On Aug 29, 2017, at 08:34, Andrés Ivaldi  wrote:
> >
> > Hello, I'm using Spark API and with Hive support, I dont have a Hive
> instance, just using Hive for some aggregation functions.
> >
> > The problem is that Hive crete the hive and metastore_db folder at the
> temp folder, I want to change that location
> >
> > Regards.
> >
> > --
> > Ing. Ivaldi Andres
>
>


-- 
Ing. Ivaldi Andres


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int,
dest: String, time: Timestamp] *


*Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*


*SQL*: *"select train, dest, max(time) from trainTimesView group by train,
dest"*// after calling
*trainTimesData.createOrReplaceTempView(trainTimesView)*


On Tue, Aug 29, 2017 at 12:59 PM, kant kodali  wrote:

> Hi All,
>
> I am wondering what is the easiest and concise way to express the
> computation below in Spark Structured streaming given that it supports both
> imperative and declarative styles?
> I am just trying to select rows that has max timestamp for each train?
> Instead of doing some sort of nested queries like we normally do in any
> relational database I am trying to see if I can leverage both imperative
> and declarative at the same time. If nested queries or join are not
> required then I would like to see how this can be possible? I am using
> spark 2.1.1.
>
> Dataset
>
> TrainDest  Time1HK10:001SH12:001  
>   SZ14:002HK13:002SH09:002SZ  
>   07:00
>
> The desired result should be:
>
> TrainDest  Time1SZ14:002HK13:00
>
>


How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Hi All,

I am wondering what is the easiest and concise way to express the
computation below in Spark Structured streaming given that it supports both
imperative and declarative styles?
I am just trying to select rows that has max timestamp for each train?
Instead of doing some sort of nested queries like we normally do in any
relational database I am trying to see if I can leverage both imperative
and declarative at the same time. If nested queries or join are not
required then I would like to see how this can be possible? I am using
spark 2.1.1.

Dataset

TrainDest  Time1HK10:001SH
12:001SZ14:002HK13:002SH
 09:002SZ07:00

The desired result should be:

TrainDest  Time1SZ14:002HK13:00


Re: add arraylist to dataframe

2017-08-29 Thread yohann jardin
Hello Asmath,

Your list exist inside the driver, but you try to add element in it from 
the executors. They are in different processes, on different nodes, they 
do not communicate just like that.
https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

There exist an action called 'collect' that will create the list for 
you. Something like the following should do what you want:

     import scala.collection.JavaConversions._
     points = df.rdd.map { row =>
         val latitude = 
com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Latitude))
         val longitude = 
com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Longitude))
         return new Coordinate(latitude, longitude)
     }.collect()

Note that you are retrieving ALL your coordinates in the driver. If you 
have too much data, this will lead to Out Of Memory.

Le 8/29/2017 à 8:21 PM, KhajaAsmath Mohammed a écrit :
> Hi,
>
> I am initiating arraylist before iterating throuugh the map method. I 
> am always getting the list size value as zero after map operation.
>
> How do I add values to list inside the map method of dataframe ? any 
> suggestions?
>
>  val points = new 
> java.util.ArrayList[com.vividsolutions.jts.geom.Coordinate]()
>     import scala.collection.JavaConversions._
>     df.rdd.map { row =>
>         val latitude = 
> com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Latitude))
>         val longitude = 
> com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Longitude))
>         points.add(new Coordinate(latitude, longitude))
>     }
> points.size is always zero.
>
>
> Thanks,
> Asmath



add arraylist to dataframe

2017-08-29 Thread KhajaAsmath Mohammed
Hi,

I am initiating arraylist before iterating throuugh the map method. I am
always getting the list size value as zero after map operation.

How do I add values to list inside the map method of dataframe ? any
suggestions?

 val points = new
java.util.ArrayList[com.vividsolutions.jts.geom.Coordinate]()
import scala.collection.JavaConversions._
df.rdd.map { row =>
val latitude =
com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Latitude))
val longitude =
com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Longitude))
points.add(new Coordinate(latitude, longitude))

}
points.size is always zero.


Thanks,
Asmath


Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-29 Thread Mikhailau, Alex
Would I use something like this to get to those VM arguments?

val runtimeMxBean = ManagementFactory.getRuntimeMXBean
val args = runtimeMxBean.getInputArguments
val conf = Conf(args)
etc.


From: Vadim Semenov 
Date: Tuesday, August 29, 2017 at 11:49 AM
To: "Mikhailau, Alex" 
Cc: "user@spark.apache.org" 
Subject: Re: Referencing YARN application id, YARN container hostname, Executor 
ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

Each java process for each of the executors has some environment variables that 
you can used, for example:

> CONTAINER_ID=container_1503994094228_0054_01_13

The executor id gets passed as an argument to the process:

> /usr/lib/jvm/java-1.8.0/bin/java … --driver-url 
> spark://CoarseGrainedScheduler@:38151 --executor-id 3 --hostname ip-1…

And it gets printed out in the container log:

> 17/08/29 13:02:00 INFO Executor: Starting executor ID 3 on host …



On Mon, Aug 28, 2017 at 5:41 PM, Mikhailau, Alex 
> wrote:
Thanks, Vadim. The issue is not access to logs. I am able to view them.

I have cloudwatch logs agent push logs to elasticsearch and then into Kibana 
using json-event-layout for log4j output. I would like to also log 
applicationId, executorId, etc in those log statements for clarity. Is there an 
MDC way with spark or something other than to achieve this?

Alex

From: Vadim Semenov 
>
Date: Monday, August 28, 2017 at 5:18 PM
To: "Mikhailau, Alex" >
Cc: "user@spark.apache.org" 
>
Subject: Re: Referencing YARN application id, YARN container hostname, Executor 
ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

When you create a EMR cluster you can specify a S3 path where logs will be 
saved after cluster, something like this:

s3://bucket/j-18ASDKLJLAKSD/containers/application_1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz

http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-manage-view-web-log-files.html

On Mon, Aug 28, 2017 at 4:43 PM, Mikhailau, Alex 
> wrote:
Does anyone have a working solution for logging YARN application id, YARN 
container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 
5.7.0 in log statements? Are there specific ENV variables available or other 
workflow for doing that?

Thank you

Alex




Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-29 Thread Vadim Semenov
Each java process for each of the executors has some environment variables
that you can used, for example:

> CONTAINER_ID=container_1503994094228_0054_01_13

The executor id gets passed as an argument to the process:

> /usr/lib/jvm/java-1.8.0/bin/java … --driver-url
spark://CoarseGrainedScheduler@:38151 *--executor-id 3 *--hostname ip-1…

And it gets printed out in the container log:

> 17/08/29 13:02:00 INFO Executor: Starting executor ID 3 on host …



On Mon, Aug 28, 2017 at 5:41 PM, Mikhailau, Alex 
wrote:

> Thanks, Vadim. The issue is not access to logs. I am able to view them.
>
>
>
> I have cloudwatch logs agent push logs to elasticsearch and then into
> Kibana using json-event-layout for log4j output. I would like to also log
> applicationId, executorId, etc in those log statements for clarity. Is
> there an MDC way with spark or something other than to achieve this?
>
>
>
> Alex
>
>
>
> *From: *Vadim Semenov 
> *Date: *Monday, August 28, 2017 at 5:18 PM
> *To: *"Mikhailau, Alex" 
> *Cc: *"user@spark.apache.org" 
> *Subject: *Re: Referencing YARN application id, YARN container hostname,
> Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log
> statements?
>
>
>
> When you create a EMR cluster you can specify a S3 path where logs will be
> saved after cluster, something like this:
>
>
>
> s3://bucket/j-18ASDKLJLAKSD/containers/application_
> 1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz
>
>
>
> http://docs.aws.amazon.com/emr/latest/ManagementGuide/
> emr-manage-view-web-log-files.html
>
>
>
> On Mon, Aug 28, 2017 at 4:43 PM, Mikhailau, Alex 
> wrote:
>
> Does anyone have a working solution for logging YARN application id, YARN
> container hostname, Executor ID and YARN attempt for jobs running on Spark
> EMR 5.7.0 in log statements? Are there specific ENV variables available or
> other workflow for doing that?
>
>
>
> Thank you
>
>
>
> Alex
>
>
>


Re: Spark submit OutOfMemory Error in local mode

2017-08-29 Thread muthu
Are you getting OutOfMemory on the driver or on the executor? Typical cause
of OOM in Spark can be due to fewer number of tasks for a job.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-OutOfMemory-Error-in-local-mode-tp29081p29117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kill Spark Application programmatically in Spark standalone cluster

2017-08-29 Thread muthu
I had similar question in the past and worked around by having my
spark-submit application to register to my master application in-order to
co-ordinate kill and/or progress of execution. This is a bit clergy I
suppose in comparison to a REST like API available in the spark stand-alone
cluster. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kill-Spark-Application-programmatically-in-Spark-standalone-cluster-tp29113p29116.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark standalone API...

2017-08-29 Thread muthu
Hello there,

I use spark standalone cluster (the one that's available at port 8080 when
the cluster is started). I am writing to see if there are any REST APIs
that's available to find the number of running applications, number of free
cores and/or executors?
I do know of quite extensive APIs available to SparkListener interface
that's available with-in every spark application.

Please advice,
Muthu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-API-tp29115.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



setup and cleanup function in spark

2017-08-29 Thread Mohammad Kargar
To implement setup/cleanup function in Spark we follow the pattern below as
discussed here

.

rdd.mapPartitions { partition =>
   if (!partition.isEmpty) {
 // Some setup code here
 partition.map(item => {
   val output = yourfunction(item)
   if (!partition.hasNext) {
 // Some cleanup code here
   }
   output
 })
   } else {
 // return an empty Iterator of your return type
   }
}

In my case the rdd is a pair-rdd loaded from Accumulo using InputFormat and
our map function only changes the values (no change to the keys). However,
the returned iterator from the  mapPartitions, somehow, ends up with
incorrect keys. I even tried "preservesPartitioning=true"  but no luck.

Debugging the code shows that the keys get changed after calling
partition.hasNext. If I remove "partition.hasNext" from the code then
everything works fine!

Any ideas?

Thanks,
Mohammad


Re: Kafka Consumer Pre Fetch Messages + Async commits

2017-08-29 Thread Julia Wistance
Thanks Cody for the reply. My thoughts were that the time is anyways
required to write and commit the offsets to any of the external systems -
which are all sync.
So why not sync commit of Kafka itself to store the offsets. It helps add
another dependency on the application side to check if say MySQL is up.

Regards,
JW

On Mon, Aug 28, 2017 at 10:38 PM, Cody Koeninger  wrote:

> 1. No, prefetched message offsets aren't exposed.
>
> 2. No, I'm not aware of any plans for sync commit, and I'm not sure
> that makes sense.  You have to be able to deal with repeat messages in
> the event of failure in any case, so the only difference sync commit
> would make would be (possibly) slower run time.
>
> On Sat, Aug 26, 2017 at 1:07 AM, Julia Wistance
>  wrote:
> > Hi Experts,
> >
> > A question on what could potentially happen with Spark Streaming 2.2.0 +
> > Kafka. LocationStrategies says that "new Kafka consumer API will
> pre-fetch
> > messages into buffers.".
> > If we store offsets in Kafka, currently we can only use a async commits.
> >
> > So,
> > 1 - Could it happen that we commit offsets that we havent processed yet
> but
> > the kafka consumers has prefetched
> > 2 - Are there plans to support a sync commit? Although we can go for an
> > alternate store of commits like HBase / Zookeeper, MySQL etc the code
> would
> > wait till the offsets are stored in either of these systems. It would
> make
> > sense that Spark / Kafka also adds a sync commit option?
> >
> > Appreciate the reply.
> > JW
> >
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-29 Thread Cody Koeninger
I don't see anything obvious.  If the slowness is correlated with the
errors you're seeing, I'd start looking at what's going on with kafka or
your network.

On Mon, Aug 28, 2017 at 7:06 PM, swetha kasireddy  wrote:

> Hi Cody,
>
> Following is the way that I am consuming data for a 60 second batch. Do
> you see anything that is wrong with the way the data is getting consumed
> that can cause slowness in performance?
>
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> kafkaBrokers,
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "auto.offset.reset" -> "latest",
>   "heartbeat.interval.ms" -> Integer.valueOf(2),
>   "session.timeout.ms" -> Integer.valueOf(6),
>   "request.timeout.ms" -> Integer.valueOf(9),
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "group.id" -> "test1"
> )
>
>   val hubbleStream = KafkaUtils.createDirectStream[String, String](
> ssc,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, String](topicsSet,
> kafkaParams)
>   )
>
> val kafkaStreamRdd = kafkaStream.transform { rdd =>
> rdd.map(consumerRecord => (consumerRecord.key(), consumerRecord.value()))
> }
>
> On Mon, Aug 28, 2017 at 11:56 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> There is no difference in performance even with Cache being enabled.
>>
>> On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> There is no difference in performance even with Cache being disabled.
>>>
>>> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger 
>>> wrote:
>>>
 So if you can run with cache enabled for some time, does that
 significantly affect the performance issue you were seeing?

 Those settings seem reasonable enough.   If preferred locations is
 behaving correctly you shouldn't need cached consumers for all 96
 partitions on any one executor, so that maxCapacity setting is
 probably unnecessary.

 On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
  wrote:
 > Because I saw some posts that say that consumer cache  enabled will
 have
 > concurrentModification exception with reduceByKeyAndWIndow. I see
 those
 > errors as well after running for sometime with cache being enabled.
 So, I
 > had to disable it. Please see the tickets below.  We have 96
 partitions. So
 > if I enable cache, would teh following settings help to improve
 performance?
 >
 > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
 Integer.valueOf(96),
 > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
 Integer.valueOf(96),
 >
 > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
 >
 >
 > http://markmail.org/message/n4cdxwurlhf44q5x
 >
 > https://issues.apache.org/jira/browse/SPARK-19185
 >
 > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger 
 wrote:
 >>
 >> Why are you setting consumer.cache.enabled to false?
 >>
 >> On Fri, Aug 25, 2017 at 2:19 PM, SRK 
 wrote:
 >> > Hi,
 >> >
 >> > What would be the appropriate settings to run Spark with Kafka 10?
 My
 >> > job
 >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But
 its
 >> > very
 >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
 Kafka 10
 >> > . I
 >> > see the following error sometimes . Please see the kafka
 parameters and
 >> > the
 >> > consumer strategy for creating the stream below. Any suggestions
 on how
 >> > to
 >> > run this with better performance would be of great help.
 >> >
 >> > java.lang.AssertionError: assertion failed: Failed to get records
 for
 >> > test
 >> > stream1 72 324027964 after polling for 12
 >> >
 >> > val kafkaParams = Map[String, Object](
 >> >   "bootstrap.servers" -> kafkaBrokers,
 >> >   "key.deserializer" -> classOf[StringDeserializer],
 >> >   "value.deserializer" -> classOf[StringDeserializer],
 >> >   "auto.offset.reset" -> "latest",
 >> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
 >> >   "session.timeout.ms" -> Integer.valueOf(6),
 >> >   "request.timeout.ms" -> Integer.valueOf(9),
 >> >   "enable.auto.commit" -> (false: java.lang.Boolean),
 >> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
 >> >   "group.id" -> "test1"
 >> > )
 >> >
 >> >   val hubbleStream = KafkaUtils.createDirectStream[String,
 String](
 >> > ssc,
 >> > 

Re: Spark 2.0.0 and Hive metastore

2017-08-29 Thread Jean Georges Perrin
Sorry if my comment is not helping, but... why do you need Hive? Can't you save 
your aggregation using parquet for example?

jg


> On Aug 29, 2017, at 08:34, Andrés Ivaldi  wrote:
> 
> Hello, I'm using Spark API and with Hive support, I dont have a Hive 
> instance, just using Hive for some aggregation functions.
> 
> The problem is that Hive crete the hive and metastore_db folder at the temp 
> folder, I want to change that location
> 
> Regards.
> 
> -- 
> Ing. Ivaldi Andres


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 2.0.0 and Hive metastore

2017-08-29 Thread Andrés Ivaldi
Hello, I'm using Spark API and with Hive support, I dont have a Hive
instance, just using Hive for some aggregation functions.

The problem is that Hive crete the hive and metastore_db folder at the temp
folder, I want to change that location

Regards.

-- 
Ing. Ivaldi Andres


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-29 Thread Georg Heiler
What about a custom UADF?
Patrick  schrieb am Mo. 28. Aug. 2017 um 20:54:

> ok . i see there is a describe() function which does the stat calculation
> on dataset similar to StatCounter but however i dont want to restrict my
> aggregations to standard mean, stddev etc and generate some custom stats ,
> or also may not run all the predefined stats but only subset of them on the
> particular column.
> I was thinking if we need to write some custom code which does this in one
> action(job) that would work for me
>
>
>
> On Tue, Aug 29, 2017 at 12:02 AM, Georg Heiler 
> wrote:
>
>> Rdd only
>> Patrick  schrieb am Mo. 28. Aug. 2017 um 20:13:
>>
>>> Ah, does it work with Dataset API or i need to convert it to RDD first ?
>>>
>>> On Mon, Aug 28, 2017 at 10:40 PM, Georg Heiler <
>>> georg.kf.hei...@gmail.com> wrote:
>>>
 What about the rdd stat counter?
 https://spark.apache.org/docs/0.6.2/api/core/spark/util/StatCounter.html

 Patrick  schrieb am Mo. 28. Aug. 2017 um 16:47:

> Hi
>
> I have two lists:
>
>
>- List one: contains names of columns on which I want to do
>aggregate operations.
>- List two: contains the aggregate operations on which I want to
>perform on each column eg ( min, max, mean)
>
> I am trying to use spark 2.0 dataset to achieve this. Spark provides
> an agg() where you can pass a Map  (of column name and
> respective aggregate operation ) as input, however I want to perform
> different aggregation operations on the same column of the data and want 
> to
> collect the result in a Map where key is the aggregate
> operation and Value is the result on the particular column.  If i add
> different agg() to same column, the key gets updated with latest value.
>
> Also I dont find any collectAsMap() operation that returns map of
> aggregated column name as key and result as value. I get collectAsList()
> but i dont know the order in which those agg() operations are run so how 
> do
> i match which list values corresponds to which agg operation.  I am able 
> to
> see the result using .show() but How can i collect the result in this 
> case ?
>
> Is it possible to do different aggregation on the same column in one
> Job(i.e only one collect operation) using agg() operation?
>
>
> Thanks in advance.
>
>
>>>
>


Re: Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Yuval Itzchakov
I mean the `StreamingExecution` generated a proper error message:

2017-08-26 07:05:00,641 ERROR StreamExecution:? - Query [id =
8597ae0b-2183-407f-8300-239a24eb68ab, runId =
c1fe627d-bcf4-4462-bbd9-b178ffaca860]
terminated with error org.apache.spark.SparkException: Job aborted due to
stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure:
Lost task 2.3 in stage 1.0 (TID 33, XX.XX.XX.XX, executor 8):
java.io.EOFException: Stream ended prematurely

But the driver was still alive, thus the application was still running.

When you say "it should not affect the termination of the application
whether the queries are active or not", do you mean that if the streaming
query engine is no longer running, the application itself should not be
affected? That sounds counter intuitive if all the application is doing is
consuming from the source to apply transformations on incoming data.

On Tue, Aug 29, 2017 at 12:21 PM, Tathagata Das  wrote:

> When you say "the application remained alive", do you mean the
> StreamingQuery stayed alive, or the whole process stayed alive? The
> StreamingQuery should be terminated immediately. And the stream execution
> threads are all daemon threads, so it should not affect the termination of
> the application whether the queries are active or not. May be something
> else is keeping the application alive?
>
>
>
> On Tue, Aug 29, 2017 at 2:09 AM, Yuval Itzchakov 
> wrote:
>
>> I wasn't sure if this would be a proper bug or not.
>>
>> Today, the behavior of Structured Streaming is such that if a source
>> fails with an exception, the `StreamExecution` class halts reading further
>> from the source, but the application is remained alive. For applications
>> where the sole purpose is to transform data from a non static source (such
>> as Kafka), this is rather useless and might be surprising.
>>
>> For example, if you have a simple monitor which checks whether the
>> application is alive or not, you'll still get reports that the application
>> is alive and running, but actually it isn't consuming anything from the
>> source and is logically dead.
>>
>> Should this be the behavior? I think that perhaps there should be a
>> configuration that asks whether to completely shutdown the application on
>> source failure.
>>
>> What do you guys think?
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>


-- 
Best Regards,
Yuval Itzchakov.


Re: Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Tathagata Das
When you say "the application remained alive", do you mean the
StreamingQuery stayed alive, or the whole process stayed alive? The
StreamingQuery should be terminated immediately. And the stream execution
threads are all daemon threads, so it should not affect the termination of
the application whether the queries are active or not. May be something
else is keeping the application alive?



On Tue, Aug 29, 2017 at 2:09 AM, Yuval Itzchakov  wrote:

> I wasn't sure if this would be a proper bug or not.
>
> Today, the behavior of Structured Streaming is such that if a source fails
> with an exception, the `StreamExecution` class halts reading further from
> the source, but the application is remained alive. For applications where
> the sole purpose is to transform data from a non static source (such as
> Kafka), this is rather useless and might be surprising.
>
> For example, if you have a simple monitor which checks whether the
> application is alive or not, you'll still get reports that the application
> is alive and running, but actually it isn't consuming anything from the
> source and is logically dead.
>
> Should this be the behavior? I think that perhaps there should be a
> configuration that asks whether to completely shutdown the application on
> source failure.
>
> What do you guys think?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Yuval Itzchakov
I wasn't sure if this would be a proper bug or not.

Today, the behavior of Structured Streaming is such that if a source fails
with an exception, the `StreamExecution` class halts reading further from
the source, but the application is remained alive. For applications where
the sole purpose is to transform data from a non static source (such as
Kafka), this is rather useless and might be surprising.

For example, if you have a simple monitor which checks whether the
application is alive or not, you'll still get reports that the application
is alive and running, but actually it isn't consuming anything from the
source and is logically dead.

Should this be the behavior? I think that perhaps there should be a
configuration that asks whether to completely shutdown the application on
source failure.

What do you guys think?

-- 
Best Regards,
Yuval Itzchakov.


unable to import graphframes

2017-08-29 Thread Imran Rajjad
Dear list,

I am following the documentation of graphframe and have started the scala
shell using following command


D:\spark-2.1.0-bin-hadoop2.7\bin>spark-shell --master local[2] --packages
graphframes:graphframes:0.5.0-spark2.1-s_2.10

Ivy Default Cache set to: C:\Users\user\.ivy2\cache
The jars for the packages stored in: C:\Users\user\.ivy2\jars
:: loading settings :: url =
jar:file:/D:/spark-2.1.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found graphframes#graphframes;0.5.0-spark2.1-s_2.10 in
spark-packages
found com.typesafe.scala-logging#scala-logging-api_2.10;2.1.2 in
central
found com.typesafe.scala-logging#scala-logging-slf4j_2.10;2.1.2 in
central
found org.scala-lang#scala-reflect;2.10.4 in central
found org.slf4j#slf4j-api;1.7.7 in local-m2-cache
:: resolution report :: resolve 288ms :: artifacts dl 7ms
:: modules in use:
com.typesafe.scala-logging#scala-logging-api_2.10;2.1.2 from
central in [default]
com.typesafe.scala-logging#scala-logging-slf4j_2.10;2.1.2 from
central in [default]
graphframes#graphframes;0.5.0-spark2.1-s_2.10 from spark-packages
in [default]
org.scala-lang#scala-reflect;2.10.4 from central in [default]
org.slf4j#slf4j-api;1.7.7 from local-m2-cache in [default]

-
|  |modules||   artifacts
|
|   conf   | number| search|dwnlded|evicted||
number|dwnlded|

-
|  default |   5   |   0   |   0   |   0   ||   5   |   0
|

-
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 5 already retrieved (0kB/7ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
2017-08-29 12:10:23,089 [main] WARN  NativeCodeLoader  - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
2017-08-29 12:10:25,128 [main] WARN  General  - Plugin (Bundle)
"org.datanucleus.store.rdbms" is already registered. Ensure you dont have
multiple JAR versions of the same plugin in the classpath. The URL
"file:/D:/spark-2.1.0-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar" is
already registered, and you are trying to register an identical plugin
located at URL
"file:/D:/spark-2.1.0-bin-hadoop2.7/bin/../jars/datanucleus-rdbms-3.2.9.jar."
2017-08-29 12:10:25,137 [main] WARN  General  - Plugin (Bundle)
"org.datanucleus" is already registered. Ensure you dont have multiple JAR
versions of the same plugin in the classpath. The URL
"file:/D:/spark-2.1.0-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar" is
already registered, and you are trying to register an identical plugin
located at URL
"file:/D:/spark-2.1.0-bin-hadoop2.7/bin/../jars/datanucleus-core-3.2.10.jar."
2017-08-29 12:10:25,141 [main] WARN  General  - Plugin (Bundle)
"org.datanucleus.api.jdo" is already registered. Ensure you dont have
multiple JAR versions of the same plugin in the classpath. The URL
"file:/D:/spark-2.1.0-bin-hadoop2.7/bin/../jars/datanucleus-api-jdo-3.2.6.jar"
is already registered, and you are trying to register an identical plugin
located at URL
"file:/D:/spark-2.1.0-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar."
2017-08-29 12:10:27,744 [main] WARN  ObjectStore  - Failed to get database
global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.10.60:4040
Spark context available as 'sc' (master = local[2], app id =
local-1503990623864).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
  /_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
scala> import org.graphframes._
:23: error: object graphframes is not a member of package org
   import org.graphframes._

is there something missing?

regards,
Imran

-- 
I.R