Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams

2016-05-30 Thread obaidul karim
foreachRDD does not return any value. I can be used just to send result to
another place/context, like db,file etc.
I could use that but seems like over head of having another hop.
I wanted to make it simple and light.

On Tuesday, 31 May 2016, nguyen duc tuan  wrote:

> How about using foreachRDD ? I think this is much better than your trick.
>
>
> 2016-05-31 12:32 GMT+07:00 obaidul karim  >:
>
>> Hi Guys,
>>
>> In the end, I am using below.
>> The trick is using "native python map" along with "spark spreaming
>> transform".
>> May not an elegent way, however it works :).
>>
>> def predictScore(texts, modelRF):
>> predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
>>  map(lambda (txt, features) : (txt ,(features.split(','))) ).\
>>  map( lambda (txt, features) : (txt, ([float(i) for i in features]))
>> ).\
>>  transform( lambda  rdd: sc.parallelize(\
>>map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
>>)\
>>  )
>>  # in the transform operation: x=text and y=features
>> # Retrun will be tuple of (score,'original text')
>> return predictions
>>
>>
>> Hope, it will help somebody who is facing same problem.
>> If anybody has better idea, please post it here.
>>
>> -Obaid
>>
>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan > > wrote:
>>
>>> Dstream has an method foreachRDD, so you can walk through all RDDs
>>> inside DStream as you want.
>>>
>>>
>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>>>
>>> 2016-05-30 19:30 GMT+07:00 obaidul karim >> >:
>>>
 Hi nguyen,

 If I am not mistaken, we cannot call  "predict" on "dstream" as you
 have suggested.
 We have to use "transform" to be able to perform normal RDD operations
 on dstreams and here I am trapped.

 -Obaid



 On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan > wrote:

> How about this ?
>
> def extract_feature(rf_model, x):
> text = getFeatures(x).split(',')
> fea = [float(i) for i in text]
> prediction = rf_model.predict(fea)
> return (prediction, x)
> output = texts.map(lambda x: extract_feature(rf_model, x))
>
> 2016-05-30 14:17 GMT+07:00 obaidul karim  >:
>
>> Hi,
>>
>> Anybody has any idea on below?
>>
>> -Obaid
>>
>>
>> On Friday, 27 May 2016, obaidul karim > > wrote:
>>
>>> Hi Guys,
>>>
>>> This is my first mail to spark users mailing list.
>>>
>>> I need help on Dstream operation.
>>>
>>> In fact, I am using a MLlib randomforest model to predict using
>>> spark streaming. In the end, I want to combine the feature Dstream &
>>> prediction Dstream together for further downstream processing.
>>>
>>> I am predicting using below piece of code:
>>>
>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x :
>>> x.split(',')).map( lambda parts : [float(i) for i in parts]
>>> ).transform(lambda rdd: rf_model.predict(rdd))
>>>
>>> Here texts is dstream having single line of text as records
>>> getFeatures generates a comma separated features extracted from each
>>> record
>>>
>>>
>>> I want the output as below tuple:
>>> ("predicted value", "original text")
>>>
>>> How can I achieve that ?
>>> or
>>> at least can I perform .zip like normal RDD operation on two
>>> Dstreams, like below:
>>> output = texts.zip(predictions)
>>>
>>>
>>> Thanks in advance.
>>>
>>> -Obaid
>>>
>>
>

>>>
>>
>


Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams

2016-05-30 Thread nguyen duc tuan
How about using foreachRDD ? I think this is much better than your trick.


2016-05-31 12:32 GMT+07:00 obaidul karim :

> Hi Guys,
>
> In the end, I am using below.
> The trick is using "native python map" along with "spark spreaming
> transform".
> May not an elegent way, however it works :).
>
> def predictScore(texts, modelRF):
> predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
>  map(lambda (txt, features) : (txt ,(features.split(','))) ).\
>  map( lambda (txt, features) : (txt, ([float(i) for i in features]))
> ).\
>  transform( lambda  rdd: sc.parallelize(\
>map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
>)\
>  )
>  # in the transform operation: x=text and y=features
> # Retrun will be tuple of (score,'original text')
> return predictions
>
>
> Hope, it will help somebody who is facing same problem.
> If anybody has better idea, please post it here.
>
> -Obaid
>
> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan 
> wrote:
>
>> Dstream has an method foreachRDD, so you can walk through all RDDs inside
>> DStream as you want.
>>
>>
>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>>
>> 2016-05-30 19:30 GMT+07:00 obaidul karim :
>>
>>> Hi nguyen,
>>>
>>> If I am not mistaken, we cannot call  "predict" on "dstream" as you
>>> have suggested.
>>> We have to use "transform" to be able to perform normal RDD operations
>>> on dstreams and here I am trapped.
>>>
>>> -Obaid
>>>
>>>
>>>
>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan 
>>> wrote:
>>>
 How about this ?

 def extract_feature(rf_model, x):
 text = getFeatures(x).split(',')
 fea = [float(i) for i in text]
 prediction = rf_model.predict(fea)
 return (prediction, x)
 output = texts.map(lambda x: extract_feature(rf_model, x))

 2016-05-30 14:17 GMT+07:00 obaidul karim :

> Hi,
>
> Anybody has any idea on below?
>
> -Obaid
>
>
> On Friday, 27 May 2016, obaidul karim  wrote:
>
>> Hi Guys,
>>
>> This is my first mail to spark users mailing list.
>>
>> I need help on Dstream operation.
>>
>> In fact, I am using a MLlib randomforest model to predict using spark
>> streaming. In the end, I want to combine the feature Dstream & prediction
>> Dstream together for further downstream processing.
>>
>> I am predicting using below piece of code:
>>
>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x :
>> x.split(',')).map( lambda parts : [float(i) for i in parts]
>> ).transform(lambda rdd: rf_model.predict(rdd))
>>
>> Here texts is dstream having single line of text as records
>> getFeatures generates a comma separated features extracted from each
>> record
>>
>>
>> I want the output as below tuple:
>> ("predicted value", "original text")
>>
>> How can I achieve that ?
>> or
>> at least can I perform .zip like normal RDD operation on two
>> Dstreams, like below:
>> output = texts.zip(predictions)
>>
>>
>> Thanks in advance.
>>
>> -Obaid
>>
>

>>>
>>
>


Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams

2016-05-30 Thread obaidul karim
Hi Guys,

In the end, I am using below.
The trick is using "native python map" along with "spark spreaming
transform".
May not an elegent way, however it works :).

def predictScore(texts, modelRF):
predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
 map(lambda (txt, features) : (txt ,(features.split(','))) ).\
 map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\
 transform( lambda  rdd: sc.parallelize(\
   map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
(x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
   )\
 )
 # in the transform operation: x=text and y=features
# Retrun will be tuple of (score,'original text')
return predictions


Hope, it will help somebody who is facing same problem.
If anybody has better idea, please post it here.

-Obaid

On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan 
wrote:

> Dstream has an method foreachRDD, so you can walk through all RDDs inside
> DStream as you want.
>
>
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>
> 2016-05-30 19:30 GMT+07:00 obaidul karim :
>
>> Hi nguyen,
>>
>> If I am not mistaken, we cannot call  "predict" on "dstream" as you have
>> suggested.
>> We have to use "transform" to be able to perform normal RDD operations on
>> dstreams and here I am trapped.
>>
>> -Obaid
>>
>>
>>
>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan 
>> wrote:
>>
>>> How about this ?
>>>
>>> def extract_feature(rf_model, x):
>>> text = getFeatures(x).split(',')
>>> fea = [float(i) for i in text]
>>> prediction = rf_model.predict(fea)
>>> return (prediction, x)
>>> output = texts.map(lambda x: extract_feature(rf_model, x))
>>>
>>> 2016-05-30 14:17 GMT+07:00 obaidul karim :
>>>
 Hi,

 Anybody has any idea on below?

 -Obaid


 On Friday, 27 May 2016, obaidul karim  wrote:

> Hi Guys,
>
> This is my first mail to spark users mailing list.
>
> I need help on Dstream operation.
>
> In fact, I am using a MLlib randomforest model to predict using spark
> streaming. In the end, I want to combine the feature Dstream & prediction
> Dstream together for further downstream processing.
>
> I am predicting using below piece of code:
>
> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x :
> x.split(',')).map( lambda parts : [float(i) for i in parts]
> ).transform(lambda rdd: rf_model.predict(rdd))
>
> Here texts is dstream having single line of text as records
> getFeatures generates a comma separated features extracted from each
> record
>
>
> I want the output as below tuple:
> ("predicted value", "original text")
>
> How can I achieve that ?
> or
> at least can I perform .zip like normal RDD operation on two Dstreams,
> like below:
> output = texts.zip(predictions)
>
>
> Thanks in advance.
>
> -Obaid
>

>>>
>>
>


Re: Spark + Kafka processing trouble

2016-05-30 Thread Mich Talebzadeh
how are you getting your data from the database. Are you using JDBC.

Can you actually call the database first (assuming the same data, put it in
temp table in Spark and cache it for the duration of windows length and use
the data from the cached table?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 31 May 2016 at 04:19, Malcolm Lockyer  wrote:

> On Tue, May 31, 2016 at 3:14 PM, Darren Govoni 
> wrote:
> > Well that could be the problem. A SQL database is essential a big
> > synchronizer. If you have a lot of spark tasks all bottlenecking on a
> single
> > database socket (is the database clustered or colocated with spark
> workers?)
> > then you will have blocked threads on the database server.
>
> Totally agree this could be a big killer to scaling up, we are
> planning to migrate. But in the meantime we are seeing such big issues
> with test data of only a few records (1, 2, 1024 etc.) produced to
> Kafka. Currently the database is NOT busy (CPU, memory and IO usage
> from the DB is tiny).
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: equvalent beewn join sql and data frame

2016-05-30 Thread Mich Talebzadeh
one is sql and the other one is its equivalent in functional programming

val s =
HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")

//These are the same
val rs =
s.join(t,s("time_id")===t("time_id")).join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))

val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 31 May 2016 at 04:55, Takeshi Yamamuro  wrote:

> Hi,
>
> The same they are.
> If you check the equality, you can use DataFrame#explain.
>
> // maropu
>
>
> On Tue, May 31, 2016 at 12:26 PM, pseudo oduesp 
> wrote:
>
>> hi guys ,
>> it s similare  thing to do :
>>
>> sqlcontext.join("select * from t1 join t2 on condition) and
>>
>> df1.join(df2,condition,'inner")??
>>
>> ps:df1.registertable('t1')
>> ps:df2.registertable('t2')
>> thanks
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Spark Streaming heap space out of memory

2016-05-30 Thread Shahbaz
Hi Christian can you please try if 30seconds works for your case .I think
your batches are getting queued up .Regards Shahbaz

On Tuesday 31 May 2016, Dancuart, Christian 
wrote:

> While it has heap space, batches run well below 15 seconds.
>
>
>
> Once it starts to run out of space, processing time takes about 1.5
> minutes. Scheduling delay is around 4 minutes and total delay around 5.5
> minutes. I usually shut it down at that point.
>
>
>
> The number of stages (and pending stages) does seem to be quite high and
> increases over time.
>
>
>
> 4584foreachRDD at HDFSPersistence.java:52 2016/05/30 16:23:52  1.9
> min36/36 (4964 skipped) 285/285 (28026 skipped)
>
> 4586transformToPair at SampleCalculator.java:88  2016/05/30
> 16:25:02  0.2 s  1/1   4/4
>
> 4585(Unknown Stage Name) 2016/05/30 16:23:52  1.2
> min1/1   1/1
>
> 4582(Unknown Stage Name) 2016/05/30 16:21:51  48 s 1/1
> (4063 skipped)  12/12 (22716 skipped)
>
> 4583(Unknown Stage Name) 2016/05/30 16:21:51  48 s
> 1/1   1/1
>
> 4580(Unknown Stage Name) 2016/05/30 16:16:38  4.0
> min36/36 (4879 skipped)285/285 (27546 skipped)
>
> 4581(Unknown Stage Name) 2016/05/30 16:16:38  0.1 s
> 1/1   4/4
>
> 4579(Unknown Stage Name) 2016/05/30 16:15:53  45 s
> 1/1   1/1
>
> 4578(Unknown Stage Name) 2016/05/30 16:14:38  1.3
> min1/1 (3993 skipped)  12/12 (22326 skipped)
>
> 4577(Unknown Stage Name) 2016/05/30 16:14:37  0.8 s
> 1/1   1/1Is this what you mean by pending stages?
>
>
>
> I have taken a few heap dumps but I’m not sure what I am looking at for
> the problematic classes.
>
>
>
> *From:* Shahbaz [mailto:shahzadh...@gmail.com
> ]
> *Sent:* 2016, May, 30 3:25 PM
> *To:* Dancuart, Christian
> *Cc:* user
> *Subject:* Re: Spark Streaming heap space out of memory
>
>
>
> Hi Christian,
>
>
>
>- What is the processing time of each of your Batch,is it exceeding 15
>seconds.
>- How many jobs are queued.
>- Can you take a heap dump and see which objects are occupying the
>heap.
>
>
>
> Regards,
>
> Shahbaz
>
>
>
>
>
> On Tue, May 31, 2016 at 12:21 AM, christian.dancu...@rbc.com
>  <
> christian.dancu...@rbc.com
> > wrote:
>
> Hi All,
>
> We have a spark streaming v1.4/java 8 application that slows down and
> eventually runs out of heap space. The less driver memory, the faster it
> happens.
>
> Appended is our spark configuration and a snapshot of the of heap taken
> using jmap on the driver process. The RDDInfo, $colon$colon and [C objects
> keep growing as we observe. We also tried to use G1GC, but it acts the
> same.
>
> Our dependency graph contains multiple updateStateByKey() calls. For each,
> we explicitly set the checkpoint interval to 240 seconds.
>
> We have our batch interval set to 15 seconds; with no delays at the start
> of
> the process.
>
> Spark configuration (Spark Driver Memory: 6GB, Spark Executor Memory: 2GB):
> spark.streaming.minRememberDuration=180s
> spark.ui.showConsoleProgress=false
> spark.streaming.receiver.writeAheadLog.enable=true
> spark.streaming.unpersist=true
> spark.streaming.stopGracefullyOnShutdown=true
> spark.streaming.ui.retainedBatches=10
> spark.ui.retainedJobs=10
> spark.ui.retainedStages=10
> spark.worker.ui.retainedExecutors=10
> spark.worker.ui.retainedDrivers=10
> spark.sql.ui.retainedExecutions=10
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> spark.kryoserializer.buffer.max=128m
>
> num #instances #bytes  class name
> --
>1:   8828200  565004800  org.apache.spark.storage.RDDInfo
>2:  20794893  499077432  scala.collection.immutable.$colon$colon
>3:   9646097  459928736  [C
>4:   9644398  231465552  java.lang.String
>5:  12760625  20417  java.lang.Integer
>6: 21326  98632  [B
>7:556959   44661232  [Lscala.collection.mutable.HashEntry;
>8:   1179788   37753216
> java.util.concurrent.ConcurrentHashMap$Node
>9:   1169264   37416448  java.util.Hashtable$Entry
>   10:552707   30951592  org.apache.spark.scheduler.StageInfo
>   11:367107   23084712  [Ljava.lang.Object;
>   12:556948   22277920  scala.collection.mutable.HashMap
>   13:  2787   22145568
> [Ljava.util.concurrent.ConcurrentHashMap$Node;
>   14:116997   12167688  org.apache.spark.executor.TaskMetrics
>   15:3604258650200
> java.util.concurrent.LinkedBlockingQueue$Node
>   16:3604178650008
> 

Re: equvalent beewn join sql and data frame

2016-05-30 Thread Takeshi Yamamuro
Hi,

The same they are.
If you check the equality, you can use DataFrame#explain.

// maropu


On Tue, May 31, 2016 at 12:26 PM, pseudo oduesp 
wrote:

> hi guys ,
> it s similare  thing to do :
>
> sqlcontext.join("select * from t1 join t2 on condition) and
>
> df1.join(df2,condition,'inner")??
>
> ps:df1.registertable('t1')
> ps:df2.registertable('t2')
> thanks
>



-- 
---
Takeshi Yamamuro


equvalent beewn join sql and data frame

2016-05-30 Thread pseudo oduesp
hi guys ,
it s similare  thing to do :

sqlcontext.join("select * from t1 join t2 on condition) and

df1.join(df2,condition,'inner")??

ps:df1.registertable('t1')
ps:df2.registertable('t2')
thanks


Re: Spark + Kafka processing trouble

2016-05-30 Thread Malcolm Lockyer
On Tue, May 31, 2016 at 3:14 PM, Darren Govoni  wrote:
> Well that could be the problem. A SQL database is essential a big
> synchronizer. If you have a lot of spark tasks all bottlenecking on a single
> database socket (is the database clustered or colocated with spark workers?)
> then you will have blocked threads on the database server.

Totally agree this could be a big killer to scaling up, we are
planning to migrate. But in the meantime we are seeing such big issues
with test data of only a few records (1, 2, 1024 etc.) produced to
Kafka. Currently the database is NOT busy (CPU, memory and IO usage
from the DB is tiny).

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Kafka processing trouble

2016-05-30 Thread Darren Govoni


Well that could be the problem. A SQL database is essential a big synchronizer. 
If you have a lot of spark tasks all bottlenecking on a single database socket 
(is the database clustered or colocated with spark workers?) then you will have 
blocked threads on the database server.


Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: Malcolm Lockyer  
Date: 05/30/2016  10:40 PM  (GMT-05:00) 
To: user@spark.apache.org 
Subject: Re: Spark + Kafka processing trouble 

On Tue, May 31, 2016 at 1:56 PM, Darren Govoni  wrote:
> So you are calling a SQL query (to a single database) within a spark
> operation distributed across your workers?

Yes, but currently with very small sets of data (1-10,000) and on a
single (dev) machine right now.





(sorry didn't reply to the list)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Kafka processing trouble

2016-05-30 Thread Malcolm Lockyer
On Tue, May 31, 2016 at 1:56 PM, Darren Govoni  wrote:
> So you are calling a SQL query (to a single database) within a spark
> operation distributed across your workers?

Yes, but currently with very small sets of data (1-10,000) and on a
single (dev) machine right now.





(sorry didn't reply to the list)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark + Kafka processing trouble

2016-05-30 Thread Darren Govoni


So you are calling a SQL query (to a single database) within a spark operation 
distributed across your workers? 


Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: Malcolm Lockyer  
Date: 05/30/2016  9:45 PM  (GMT-05:00) 
To: user@spark.apache.org 
Subject: Spark + Kafka processing trouble 

Hopefully this is not off topic for this list, but I am hoping to
reach some people who have used Kafka + Spark before.

We are new to Spark and are setting up our first production
environment and hitting a speed issue that maybe configuration related
- and we have little experience in configuring Spark environments.

So we've got a Spark streaming job that seems to take an inordinate
amount of time to process. I realize that without specifics, it is
difficult to trace - however the most basic primitives in Spark are
performing horribly. The lazy nature of Spark is making it difficult
for me to understand what is happening - any suggestions are very much
appreciated.

Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
Kafka and PostgreSQL, both local. The job is designed to:

a) grab some data from Kafka
b) correlate with existing data in PostgreSQL
c) output data to Kafka

I am isolating timings by calling System.nanoTime() before and after
something that forces calculation, for example .count() on a
DataFrame. It seems like every operation has a MASSIVE fixed overhead
and that is stacking up making each iteration on the RDD extremely
slow. Slow operations include pulling a single item from the Kafka
queue, running a simple query against PostgresSQL, and running a Spark
aggregation on a RDD with a handful of rows.

The machine is not maxing out on memory, disk or CPU. The machine
seems to be doing nothing for a high percentage of the execution time.
We have reproduced this behavior on two other machines. So we're
suspecting a configuration issue

As a concrete example, we have a DataFrame produced by running a JDBC
query by mapping over an RDD from Kafka. Calling count() (I guess
forcing execution) on this DataFrame when there is *1* item/row (Note:
SQL database is EMPTY at this point so this is not a factor) takes 4.5
seconds, calling count when there are 10,000 items takes 7 seconds.

Can anybody offer experience of something like this happening for
them? Any suggestions on how to understand what is going wrong?

I have tried tuning the number of Kafka partitions - increasing this
seems to increase the concurrency and ultimately number of things
processed per minute, but to get something half decent, I'm going to
need running with 1024 or more partitions. Is 1024 partitions a
reasonable number? What do you use in you environments?

I've tried different options for batchDuration. The calculation seems
to be batchDuration * Kafka partitions for number of items per
iteration, but this is always still extremely slow (many per iteration
vs. very few doesn't seem to really improve things). Can you suggest a
list of the Spark configuration parameters related to speed that you
think are key - preferably with the values you use for those
parameters?

I'd really really appreciate any help or suggestions as I've been
working on this speed issue for 3 days without success and my head is
starting to hurt. Thanks in advance.



Thanks,

--

Malcolm Lockyer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark + Kafka processing trouble

2016-05-30 Thread Malcolm Lockyer
Hopefully this is not off topic for this list, but I am hoping to
reach some people who have used Kafka + Spark before.

We are new to Spark and are setting up our first production
environment and hitting a speed issue that maybe configuration related
- and we have little experience in configuring Spark environments.

So we've got a Spark streaming job that seems to take an inordinate
amount of time to process. I realize that without specifics, it is
difficult to trace - however the most basic primitives in Spark are
performing horribly. The lazy nature of Spark is making it difficult
for me to understand what is happening - any suggestions are very much
appreciated.

Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
Kafka and PostgreSQL, both local. The job is designed to:

a) grab some data from Kafka
b) correlate with existing data in PostgreSQL
c) output data to Kafka

I am isolating timings by calling System.nanoTime() before and after
something that forces calculation, for example .count() on a
DataFrame. It seems like every operation has a MASSIVE fixed overhead
and that is stacking up making each iteration on the RDD extremely
slow. Slow operations include pulling a single item from the Kafka
queue, running a simple query against PostgresSQL, and running a Spark
aggregation on a RDD with a handful of rows.

The machine is not maxing out on memory, disk or CPU. The machine
seems to be doing nothing for a high percentage of the execution time.
We have reproduced this behavior on two other machines. So we're
suspecting a configuration issue

As a concrete example, we have a DataFrame produced by running a JDBC
query by mapping over an RDD from Kafka. Calling count() (I guess
forcing execution) on this DataFrame when there is *1* item/row (Note:
SQL database is EMPTY at this point so this is not a factor) takes 4.5
seconds, calling count when there are 10,000 items takes 7 seconds.

Can anybody offer experience of something like this happening for
them? Any suggestions on how to understand what is going wrong?

I have tried tuning the number of Kafka partitions - increasing this
seems to increase the concurrency and ultimately number of things
processed per minute, but to get something half decent, I'm going to
need running with 1024 or more partitions. Is 1024 partitions a
reasonable number? What do you use in you environments?

I've tried different options for batchDuration. The calculation seems
to be batchDuration * Kafka partitions for number of items per
iteration, but this is always still extremely slow (many per iteration
vs. very few doesn't seem to really improve things). Can you suggest a
list of the Spark configuration parameters related to speed that you
think are key - preferably with the values you use for those
parameters?

I'd really really appreciate any help or suggestions as I've been
working on this speed issue for 3 days without success and my head is
starting to hurt. Thanks in advance.



Thanks,

--

Malcolm Lockyer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can we use existing R model in Spark

2016-05-30 Thread Sun Rui
I mean train random forest model (not using R) and use it for prediction 
together using Spark ML.
> On May 30, 2016, at 20:15, Neha Mehta  wrote:
> 
> Thanks Sujeet.. will try it out.
> 
> Hi Sun,
> 
> Can you please tell me what did you mean by "Maybe you can try using the 
> existing random forest model" ? did you mean creating the model again using 
> Spark MLLIB?
> 
> Thanks,
> Neha
> 
> 
> 
> 
> From: sujeet jog >
> Date: Mon, May 30, 2016 at 4:52 PM
> Subject: Re: Can we use existing R model in Spark
> To: Sun Rui >
> Cc: Neha Mehta >, user 
> >
> 
> 
> Try to invoke a R script from Spark using rdd pipe method , get the work done 
> & and receive the model back in RDD. 
> 
> 
> for ex :-
> .   rdd.pipe("")
> 
> 
> On Mon, May 30, 2016 at 3:57 PM, Sun Rui  > wrote:
> Unfortunately no. Spark does not support loading external modes (for 
> examples, PMML) for now.
> Maybe you can try using the existing random forest model in Spark.
> 
>> On May 30, 2016, at 18:21, Neha Mehta > > wrote:
>> 
>> Hi,
>> 
>> I have an existing random forest model created using R. I want to use that 
>> to predict values on Spark. Is it possible to do the same? If yes, then how?
>> 
>> Thanks & Regards,
>> Neha
>> 
> 
> 
> 
> 



Re: GraphX Java API

2016-05-30 Thread Chris Fregly
btw, GraphX in Action is one of the better books out on Spark.

Michael did a great job with this one.  He even breaks down snippets of
Scala for newbies to understand the seemingly-arbitrary syntax.  I learned
quite a bit about not only Spark, but also Scala.

And of course, we shouldn't forget about Sean's Advanced Analytics with
Spark which, of course, is a classic that I still reference regularly.  :)

On Mon, May 30, 2016 at 7:42 AM, Michael Malak <
michaelma...@yahoo.com.invalid> wrote:

> Yes, it is possible to use GraphX from Java but it requires 10x the amount
> of code and involves using obscure typing and pre-defined lambda prototype
> facilities. I give an example of it in my book, the source code for which
> can be downloaded for free from
> https://www.manning.com/books/spark-graphx-in-action The relevant example
> is EdgeCount.java in chapter 10.
>
> As I suggest in my book, likely the only reason you'd want to put yourself
> through that torture is corporate mandate or compatibility with Java
> bytecode tools.
>
>
> --
> *From:* Sean Owen 
> *To:* Takeshi Yamamuro ; "Kumar, Abhishek (US -
> Bengaluru)" 
> *Cc:* "user@spark.apache.org" 
> *Sent:* Monday, May 30, 2016 7:07 AM
> *Subject:* Re: GraphX Java API
>
> No, you can call any Scala API in Java. It is somewhat less convenient if
> the method was not written with Java in mind but does work.
>
> On Mon, May 30, 2016, 00:32 Takeshi Yamamuro 
> wrote:
>
> These package are used only for Scala.
>
> On Mon, May 30, 2016 at 2:23 PM, Kumar, Abhishek (US - Bengaluru) <
> abhishekkuma...@deloitte.com> wrote:
>
> Hey,
> ·   I see some graphx packages listed here:
> http://spark.apache.org/docs/latest/api/java/index.html
> ·   org.apache.spark.graphx
> 
> ·   org.apache.spark.graphx.impl
> 
> ·   org.apache.spark.graphx.lib
> 
> ·   org.apache.spark.graphx.util
> 
> Aren’t they meant to be used with JAVA?
> Thanks
>
> *From:* Santoshakhilesh [mailto:santosh.akhil...@huawei.com]
> *Sent:* Friday, May 27, 2016 4:52 PM
> *To:* Kumar, Abhishek (US - Bengaluru) ;
> user@spark.apache.org
> *Subject:* RE: GraphX Java API
>
> GraphX APis are available only in Scala. If you need to use GraphX you
> need to switch to Scala.
>
> *From:* Kumar, Abhishek (US - Bengaluru) [
> mailto:abhishekkuma...@deloitte.com ]
> *Sent:* 27 May 2016 19:59
> *To:* user@spark.apache.org
> *Subject:* GraphX Java API
>
> Hi,
>
> We are trying to consume the Java API for GraphX, but there is no
> documentation available online on the usage or examples. It would be great
> if we could get some examples in Java.
>
> Thanks and regards,
>
> *Abhishek Kumar*
>
>
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
> v.E.1
>
>
>
>
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>
>
>
>


-- 
*Chris Fregly*
Research Scientist @ Flux Capacitor AI
"Bringing AI Back to the Future!"
San Francisco, CA
http://fluxcapacitor.ai


Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-05-30 Thread Mich Talebzadeh
I think we are going to move to a model that the computation stack will be
separate from storage stack and moreover something like Hive that provides
the means for persistent storage (well HDFS is the one that stores all the
data) will have an in-memory type capability much like what Oracle TimesTen
IMDB does with its big brother Oracle. Now TimesTen is effectively designed
to provide in-memory capability for analytics for Oracle 12c. These
two work like
an index or materialized view.  You write queries against tables -
optimizer figures out whether to use row oriented storage and indexes to
access (Oracle classic) or column non-indexed storage to answer
(TimesTen). just
one optimizer.

I gather Hive will be like that eventually. it will decide based on the
frequency of access where to look for data. Yes we may have 10 TB of data
on disk but how much of it is frequently accessed (hot data). 80-20 rule?
In reality may be just 2TB or most recent partitions etc. The rest is cold
data.

cheers



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 May 2016 at 21:59, Michael Segel  wrote:

> And you have MapR supporting Apache Drill.
>
> So these are all alternatives to Spark, and its not necessarily an either
> or scenario. You can have both.
>
> On May 30, 2016, at 12:49 PM, Mich Talebzadeh 
> wrote:
>
> yep Hortonworks supports Tez for one reason or other which I am going
> hopefully to test it as the query engine for hive. Tthough I think Spark
> will be faster because of its in-memory support.
>
> Also if you are independent then you better off dealing with Spark and
> Hive without the need to support another stack like Tez.
>
> Cloudera support Impala instead of Hive but it is not something I have
> used. .
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 30 May 2016 at 20:19, Michael Segel  wrote:
>
>> Mich,
>>
>> Most people use vendor releases because they need to have the support.
>> Hortonworks is the vendor who has the most skin in the game when it comes
>> to Tez.
>>
>> If memory serves, Tez isn’t going to be M/R but a local execution engine?
>> Then LLAP is the in-memory piece to speed up Tez?
>>
>> HTH
>>
>> -Mike
>>
>> On May 29, 2016, at 1:35 PM, Mich Talebzadeh 
>> wrote:
>>
>> thanks I think the problem is that the TEZ user group is exceptionally
>> quiet. Just sent an email to Hive user group to see anyone has managed to
>> built a vendor independent version.
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 29 May 2016 at 21:23, Jörn Franke  wrote:
>>
>>> Well I think it is different from MR. It has some optimizations which
>>> you do not find in MR. Especially the LLAP option in Hive2 makes it
>>> interesting.
>>>
>>> I think hive 1.2 works with 0.7 and 2.0 with 0.8 . At least for 1.2 it
>>> is integrated in the Hortonworks distribution.
>>>
>>>
>>> On 29 May 2016, at 21:43, Mich Talebzadeh 
>>> wrote:
>>>
>>> Hi Jorn,
>>>
>>> I started building apache-tez-0.8.2 but got few errors. Couple of guys
>>> from TEZ user group kindly gave a hand but I could not go very far (or may
>>> be I did not make enough efforts) making it work.
>>>
>>> That TEZ user group is very quiet as well.
>>>
>>> My understanding is TEZ is MR with DAG but of course Spark has both plus
>>> in-memory capability.
>>>
>>> It would be interesting to see what version of TEZ works as execution
>>> engine with Hive.
>>>
>>> Vendors are divided on this (use Hive with TEZ) or use Impala instead of
>>> Hive etc as I am sure you already know.
>>>
>>> Cheers,
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 29 May 2016 at 20:19, Jörn Franke  wrote:
>>>
 Very interesting do you plan also a test with TEZ?

 On 29 May 2016, at 13:40, Mich Talebzadeh 
 wrote:

 Hi,

 I did another study of Hive using Spark engine compared to Hive with MR.

 Basically took the original table imported using Sqoop and created 

Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-05-30 Thread Michael Segel
And you have MapR supporting Apache Drill. 

So these are all alternatives to Spark, and its not necessarily an either or 
scenario. You can have both. 

> On May 30, 2016, at 12:49 PM, Mich Talebzadeh  
> wrote:
> 
> yep Hortonworks supports Tez for one reason or other which I am going 
> hopefully to test it as the query engine for hive. Tthough I think Spark will 
> be faster because of its in-memory support.
> 
> Also if you are independent then you better off dealing with Spark and Hive 
> without the need to support another stack like Tez.
> 
> Cloudera support Impala instead of Hive but it is not something I have used. .
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 30 May 2016 at 20:19, Michael Segel  > wrote:
> Mich, 
> 
> Most people use vendor releases because they need to have the support. 
> Hortonworks is the vendor who has the most skin in the game when it comes to 
> Tez. 
> 
> If memory serves, Tez isn’t going to be M/R but a local execution engine? 
> Then LLAP is the in-memory piece to speed up Tez? 
> 
> HTH
> 
> -Mike
> 
>> On May 29, 2016, at 1:35 PM, Mich Talebzadeh > > wrote:
>> 
>> thanks I think the problem is that the TEZ user group is exceptionally 
>> quiet. Just sent an email to Hive user group to see anyone has managed to 
>> built a vendor independent version.
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>>  
>> 
>> On 29 May 2016 at 21:23, Jörn Franke > > wrote:
>> Well I think it is different from MR. It has some optimizations which you do 
>> not find in MR. Especially the LLAP option in Hive2 makes it interesting. 
>> 
>> I think hive 1.2 works with 0.7 and 2.0 with 0.8 . At least for 1.2 it is 
>> integrated in the Hortonworks distribution. 
>> 
>> 
>> On 29 May 2016, at 21:43, Mich Talebzadeh > > wrote:
>> 
>>> Hi Jorn,
>>> 
>>> I started building apache-tez-0.8.2 but got few errors. Couple of guys from 
>>> TEZ user group kindly gave a hand but I could not go very far (or may be I 
>>> did not make enough efforts) making it work.
>>> 
>>> That TEZ user group is very quiet as well.
>>> 
>>> My understanding is TEZ is MR with DAG but of course Spark has both plus 
>>> in-memory capability.
>>> 
>>> It would be interesting to see what version of TEZ works as execution 
>>> engine with Hive.
>>> 
>>> Vendors are divided on this (use Hive with TEZ) or use Impala instead of 
>>> Hive etc as I am sure you already know.
>>> 
>>> Cheers,
>>> 
>>> 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> 
>>>  
>>> http://talebzadehmich.wordpress.com 
>>>  
>>> 
>>> On 29 May 2016 at 20:19, Jörn Franke >> > wrote:
>>> Very interesting do you plan also a test with TEZ?
>>> 
>>> On 29 May 2016, at 13:40, Mich Talebzadeh >> > wrote:
>>> 
 Hi,
 
 I did another study of Hive using Spark engine compared to Hive with MR.
 
 Basically took the original table imported using Sqoop and created and 
 populated a new ORC table partitioned by year and month into 48 partitions 
 as follows:
 
 
 ​ 
 Connections use JDBC via beeline. Now for each partition using MR it takes 
 an average of 17 minutes as seen below for each PARTITION..  Now that is 
 just an individual partition and there are 48 partitions.
 
 In contrast doing the same operation with Spark engine took 10 minutes all 
 inclusive. I just gave up on MR. You can see the StartTime and FinishTime 
 from below
 
 
 
 This is by no means indicate that Spark is much better than MR but shows 
 that some very good results can ve achieved using Spark engine.
 
 
 Dr Mich Talebzadeh
  
 LinkedIn  
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
  
 
  
 http://talebzadehmich.wordpress.com 

Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-05-30 Thread Jörn Franke
I do not think that in-memory itself will make things faster in all cases. 
Especially if you use Tez with Orc or parquet. 
Especially for ad hoc queries on large dataset (indecently if they fit 
in-memory or not) this will have a significant impact. This is an experience I 
have also with the in-memory databases with Oracle or SQL server. It might 
sound surprising, but has some explanations. Orc and parquet have the min/max 
indexes, store and process data (important choose the right datatype, if 
everything is varchar then it is your fault that the database is not 
performing) very efficiently, only load into memory what is needed. This is not 
the case for in-memory systems. Usually everything is loaded in memory and not 
only the parts which are needed. This means due to the absence of min max 
indexes you have to go through everything. Let us assume the table has a size 
of 10 TB. There are different ad hoc queries that only process 1 gb (each one 
addresses different areas). In hive+tez this is currently rather efficient: you 
load 1 gb (negligible in a cluster) and process 1 gb.  In spark you would cache 
10 tb (you do not know which can part will be addressed) which takes a lot of 
time to first load and each query needs to go in memory through 10 tb. This 
might be an extreme case, but it is not uncommon. An exception are of course 
machine learning algorithms (the original purpose of Spark), where I see more 
advantages for Spark. Most of the traditional companies have probably both use 
cases (maybe with a bias towards the first). Internet companies have more 
towards the last.

That being said all systems are evolving. Hive supports tez+llap which is 
basically the in-memory support. Spark stores the data more efficient in 1.5 
and 1.6 (in the dataset Api and dataframe - issue here that it is not the same 
format as the files from disk). Let's see if there will be a convergence - my 
bet is that both systems will be used optimized for their use cases.

The bottom line is you have to first optimize and think what you need to do 
before going in-memory. Never load everything in-memory. You will be surprised. 
Have multiple technologies in your ecosystem. Understand them. Unfortunately 
most of the consultant companies have only poor experience and understanding of 
the complete picture and thus they fail with both technologies, which is sad, 
because both can be extremely powerful and a competitive  advantage.

> On 30 May 2016, at 21:49, Mich Talebzadeh  wrote:
> 
> yep Hortonworks supports Tez for one reason or other which I am going 
> hopefully to test it as the query engine for hive. Tthough I think Spark will 
> be faster because of its in-memory support.
> 
> Also if you are independent then you better off dealing with Spark and Hive 
> without the need to support another stack like Tez.
> 
> Cloudera support Impala instead of Hive but it is not something I have used. .
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
>  
> 
>> On 30 May 2016 at 20:19, Michael Segel  wrote:
>> Mich, 
>> 
>> Most people use vendor releases because they need to have the support. 
>> Hortonworks is the vendor who has the most skin in the game when it comes to 
>> Tez. 
>> 
>> If memory serves, Tez isn’t going to be M/R but a local execution engine? 
>> Then LLAP is the in-memory piece to speed up Tez? 
>> 
>> HTH
>> 
>> -Mike
>> 
>>> On May 29, 2016, at 1:35 PM, Mich Talebzadeh  
>>> wrote:
>>> 
>>> thanks I think the problem is that the TEZ user group is exceptionally 
>>> quiet. Just sent an email to Hive user group to see anyone has managed to 
>>> built a vendor independent version.
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> http://talebzadehmich.wordpress.com
>>>  
>>> 
 On 29 May 2016 at 21:23, Jörn Franke  wrote:
 Well I think it is different from MR. It has some optimizations which you 
 do not find in MR. Especially the LLAP option in Hive2 makes it 
 interesting. 
 
 I think hive 1.2 works with 0.7 and 2.0 with 0.8 . At least for 1.2 it is 
 integrated in the Hortonworks distribution. 
 
 
> On 29 May 2016, at 21:43, Mich Talebzadeh  
> wrote:
> 
> Hi Jorn,
> 
> I started building apache-tez-0.8.2 but got few errors. Couple of guys 
> from TEZ user group kindly gave a hand but I could not go very far (or 
> may be I did not make enough efforts) making it work.
> 
> That TEZ user group is very quiet as well.
> 
> My understanding is TEZ is MR with DAG but of course Spark has both plus 
> in-memory capability.
> 
> It would be interesting to see what 

RE: Spark Streaming heap space out of memory

2016-05-30 Thread Dancuart, Christian
While it has heap space, batches run well below 15 seconds.

Once it starts to run out of space, processing time takes about 1.5 minutes. 
Scheduling delay is around 4 minutes and total delay around 5.5 minutes. I 
usually shut it down at that point.

The number of stages (and pending stages) does seem to be quite high and 
increases over time.

4584foreachRDD at HDFSPersistence.java:52 2016/05/30 16:23:52  1.9 min  
  36/36 (4964 skipped) 285/285 (28026 skipped)
4586transformToPair at SampleCalculator.java:88  2016/05/30 
16:25:02  0.2 s  1/1   4/4
4585(Unknown Stage Name) 2016/05/30 16:23:52  1.2 min
1/1   1/1
4582(Unknown Stage Name) 2016/05/30 16:21:51  48 s 1/1 (4063 
skipped)  12/12 (22716 skipped)
4583(Unknown Stage Name) 2016/05/30 16:21:51  48 s 1/1   1/1
4580(Unknown Stage Name) 2016/05/30 16:16:38  4.0 min
36/36 (4879 skipped)285/285 (27546 skipped)
4581(Unknown Stage Name) 2016/05/30 16:16:38  0.1 s1/1   4/4
4579(Unknown Stage Name) 2016/05/30 16:15:53  45 s 1/1   1/1
4578(Unknown Stage Name) 2016/05/30 16:14:38  1.3 min
1/1 (3993 skipped)  12/12 (22326 skipped)
4577(Unknown Stage Name) 2016/05/30 16:14:37  0.8 s1/1   
1/1Is this what you mean by pending stages?

I have taken a few heap dumps but I’m not sure what I am looking at for the 
problematic classes.

From: Shahbaz [mailto:shahzadh...@gmail.com]
Sent: 2016, May, 30 3:25 PM
To: Dancuart, Christian
Cc: user
Subject: Re: Spark Streaming heap space out of memory

Hi Christian,


  *   What is the processing time of each of your Batch,is it exceeding 15 
seconds.
  *   How many jobs are queued.
  *   Can you take a heap dump and see which objects are occupying the heap.

Regards,
Shahbaz


On Tue, May 31, 2016 at 12:21 AM, 
christian.dancu...@rbc.com 
> wrote:
Hi All,

We have a spark streaming v1.4/java 8 application that slows down and
eventually runs out of heap space. The less driver memory, the faster it
happens.

Appended is our spark configuration and a snapshot of the of heap taken
using jmap on the driver process. The RDDInfo, $colon$colon and [C objects
keep growing as we observe. We also tried to use G1GC, but it acts the same.

Our dependency graph contains multiple updateStateByKey() calls. For each,
we explicitly set the checkpoint interval to 240 seconds.

We have our batch interval set to 15 seconds; with no delays at the start of
the process.

Spark configuration (Spark Driver Memory: 6GB, Spark Executor Memory: 2GB):
spark.streaming.minRememberDuration=180s
spark.ui.showConsoleProgress=false
spark.streaming.receiver.writeAheadLog.enable=true
spark.streaming.unpersist=true
spark.streaming.stopGracefullyOnShutdown=true
spark.streaming.ui.retainedBatches=10
spark.ui.retainedJobs=10
spark.ui.retainedStages=10
spark.worker.ui.retainedExecutors=10
spark.worker.ui.retainedDrivers=10
spark.sql.ui.retainedExecutions=10
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max=128m

num #instances #bytes  class name
--
   1:   8828200  565004800  org.apache.spark.storage.RDDInfo
   2:  20794893  499077432  scala.collection.immutable.$colon$colon
   3:   9646097  459928736  [C
   4:   9644398  231465552  java.lang.String
   5:  12760625  20417  java.lang.Integer
   6: 21326  98632  [B
   7:556959   44661232  [Lscala.collection.mutable.HashEntry;
   8:   1179788   37753216
java.util.concurrent.ConcurrentHashMap$Node
   9:   1169264   37416448  java.util.Hashtable$Entry
  10:552707   30951592  org.apache.spark.scheduler.StageInfo
  11:367107   23084712  [Ljava.lang.Object;
  12:556948   22277920  scala.collection.mutable.HashMap
  13:  2787   22145568
[Ljava.util.concurrent.ConcurrentHashMap$Node;
  14:116997   12167688  org.apache.spark.executor.TaskMetrics
  15:3604258650200
java.util.concurrent.LinkedBlockingQueue$Node
  16:3604178650008
org.apache.spark.deploy.history.yarn.HandleSparkEvent
  17:  83328478088  [Ljava.util.Hashtable$Entry;
  18:3510618425464  scala.collection.mutable.ArrayBuffer
  19:1169638421336  org.apache.spark.scheduler.TaskInfo
  20:4461367138176  scala.Some
  21:2119685087232
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
  22:1169634678520
org.apache.spark.scheduler.SparkListenerTaskEnd
  23:1076794307160
org.apache.spark.executor.ShuffleWriteMetrics
  24: 721624041072

Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-05-30 Thread Ovidiu-Cristian MARCU
Spark in relation to Tez can be like a Flink runner for Apache Beam? The use 
case of Tez however may be interesting (but current implementation only 
YARN-based?)

Spark is efficient (or faster) for a number of reasons, including its 
‘in-memory’ execution (from my understanding and experiments). If one really 
cares to dive in, just enough to read their papers which explain very well the 
optimization framework (graph-specific, MPP db, Catalyst, ML pipelines etc.) 
which Spark become after the initial RDD implementation.

What Spark is missing is a way of reaching its users by a good ‘production’ 
level, good documentation and nice feedback from the masters of this unique 
piece.

Just an opinion.

Best,
Ovidiu


> On 30 May 2016, at 21:49, Mich Talebzadeh  wrote:
> 
> yep Hortonworks supports Tez for one reason or other which I am going 
> hopefully to test it as the query engine for hive. Tthough I think Spark will 
> be faster because of its in-memory support.
> 
> Also if you are independent then you better off dealing with Spark and Hive 
> without the need to support another stack like Tez.
> 
> Cloudera support Impala instead of Hive but it is not something I have used. .
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 30 May 2016 at 20:19, Michael Segel  > wrote:
> Mich, 
> 
> Most people use vendor releases because they need to have the support. 
> Hortonworks is the vendor who has the most skin in the game when it comes to 
> Tez. 
> 
> If memory serves, Tez isn’t going to be M/R but a local execution engine? 
> Then LLAP is the in-memory piece to speed up Tez? 
> 
> HTH
> 
> -Mike
> 
>> On May 29, 2016, at 1:35 PM, Mich Talebzadeh > > wrote:
>> 
>> thanks I think the problem is that the TEZ user group is exceptionally 
>> quiet. Just sent an email to Hive user group to see anyone has managed to 
>> built a vendor independent version.
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>>  
>> 
>> On 29 May 2016 at 21:23, Jörn Franke > > wrote:
>> Well I think it is different from MR. It has some optimizations which you do 
>> not find in MR. Especially the LLAP option in Hive2 makes it interesting. 
>> 
>> I think hive 1.2 works with 0.7 and 2.0 with 0.8 . At least for 1.2 it is 
>> integrated in the Hortonworks distribution. 
>> 
>> 
>> On 29 May 2016, at 21:43, Mich Talebzadeh > > wrote:
>> 
>>> Hi Jorn,
>>> 
>>> I started building apache-tez-0.8.2 but got few errors. Couple of guys from 
>>> TEZ user group kindly gave a hand but I could not go very far (or may be I 
>>> did not make enough efforts) making it work.
>>> 
>>> That TEZ user group is very quiet as well.
>>> 
>>> My understanding is TEZ is MR with DAG but of course Spark has both plus 
>>> in-memory capability.
>>> 
>>> It would be interesting to see what version of TEZ works as execution 
>>> engine with Hive.
>>> 
>>> Vendors are divided on this (use Hive with TEZ) or use Impala instead of 
>>> Hive etc as I am sure you already know.
>>> 
>>> Cheers,
>>> 
>>> 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> 
>>>  
>>> http://talebzadehmich.wordpress.com 
>>>  
>>> 
>>> On 29 May 2016 at 20:19, Jörn Franke >> > wrote:
>>> Very interesting do you plan also a test with TEZ?
>>> 
>>> On 29 May 2016, at 13:40, Mich Talebzadeh >> > wrote:
>>> 
 Hi,
 
 I did another study of Hive using Spark engine compared to Hive with MR.
 
 Basically took the original table imported using Sqoop and created and 
 populated a new ORC table partitioned by year and month into 48 partitions 
 as follows:
 
 
 ​ 
 Connections use JDBC via beeline. Now for each partition using MR it takes 
 an average of 17 minutes as seen below for each PARTITION..  Now that is 
 just an individual partition and there are 48 partitions.
 
 In contrast doing the same operation with Spark engine took 10 

Re: Does Spark support updates or deletes on underlying Hive tables

2016-05-30 Thread Mich Talebzadeh
Hi,

Remember that acidity and transactional support was added to Hive 0.14
onward because of advent of ORC tables.

Now Spark does not support transactions because quote "there is a piece in
the execution side that needs to send heartbeats to Hive metastore saying a
transaction is still alive". That has not been implemented in Spark yet to
my knowledge.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 May 2016 at 19:37, Ashok Kumar  wrote:

> Hi,
>
> I can do inserts from Spark on Hive tables. How about updates or deletes.
> They are failing when I tried?
>
> Thanking
>
>
>


Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-05-30 Thread Mich Talebzadeh
yep Hortonworks supports Tez for one reason or other which I am going
hopefully to test it as the query engine for hive. Tthough I think Spark
will be faster because of its in-memory support.

Also if you are independent then you better off dealing with Spark and Hive
without the need to support another stack like Tez.

Cloudera support Impala instead of Hive but it is not something I have
used. .

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 May 2016 at 20:19, Michael Segel  wrote:

> Mich,
>
> Most people use vendor releases because they need to have the support.
> Hortonworks is the vendor who has the most skin in the game when it comes
> to Tez.
>
> If memory serves, Tez isn’t going to be M/R but a local execution engine?
> Then LLAP is the in-memory piece to speed up Tez?
>
> HTH
>
> -Mike
>
> On May 29, 2016, at 1:35 PM, Mich Talebzadeh 
> wrote:
>
> thanks I think the problem is that the TEZ user group is exceptionally
> quiet. Just sent an email to Hive user group to see anyone has managed to
> built a vendor independent version.
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 29 May 2016 at 21:23, Jörn Franke  wrote:
>
>> Well I think it is different from MR. It has some optimizations which you
>> do not find in MR. Especially the LLAP option in Hive2 makes it
>> interesting.
>>
>> I think hive 1.2 works with 0.7 and 2.0 with 0.8 . At least for 1.2 it is
>> integrated in the Hortonworks distribution.
>>
>>
>> On 29 May 2016, at 21:43, Mich Talebzadeh 
>> wrote:
>>
>> Hi Jorn,
>>
>> I started building apache-tez-0.8.2 but got few errors. Couple of guys
>> from TEZ user group kindly gave a hand but I could not go very far (or may
>> be I did not make enough efforts) making it work.
>>
>> That TEZ user group is very quiet as well.
>>
>> My understanding is TEZ is MR with DAG but of course Spark has both plus
>> in-memory capability.
>>
>> It would be interesting to see what version of TEZ works as execution
>> engine with Hive.
>>
>> Vendors are divided on this (use Hive with TEZ) or use Impala instead of
>> Hive etc as I am sure you already know.
>>
>> Cheers,
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 29 May 2016 at 20:19, Jörn Franke  wrote:
>>
>>> Very interesting do you plan also a test with TEZ?
>>>
>>> On 29 May 2016, at 13:40, Mich Talebzadeh 
>>> wrote:
>>>
>>> Hi,
>>>
>>> I did another study of Hive using Spark engine compared to Hive with MR.
>>>
>>> Basically took the original table imported using Sqoop and created and
>>> populated a new ORC table partitioned by year and month into 48 partitions
>>> as follows:
>>>
>>> 
>>> ​
>>> Connections use JDBC via beeline. Now for each partition using MR it
>>> takes an average of 17 minutes as seen below for each PARTITION..  Now that
>>> is just an individual partition and there are 48 partitions.
>>>
>>> In contrast doing the same operation with Spark engine took 10 minutes
>>> all inclusive. I just gave up on MR. You can see the StartTime and
>>> FinishTime from below
>>>
>>> 
>>>
>>> This is by no means indicate that Spark is much better than MR but shows
>>> that some very good results can ve achieved using Spark engine.
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 24 May 2016 at 08:03, Mich Talebzadeh 
>>> wrote:
>>>
 Hi,

 We use Hive as the database and use Spark as an all purpose query tool.

 Whether Hive is the write database for purpose or one is better off
 with something like Phoenix on Hbase, well the answer is it depends and
 your mileage varies.

 So fit for purpose.

 Ideally what wants is to use the fastest  method to get the results.
 How fast we confine it to our SLA agreements in production and that helps
 us from unnecessary further work as we technologists like to play around.

 So in short, we use Spark most of the time and use Hive as the backend
 engine for data storage, mainly ORC 

Re: Secondary Indexing?

2016-05-30 Thread Mich Talebzadeh
your point on

"At the same time… if you are dealing with a large enough set of data… you
will have I/O. Both in terms of networking and Physical. This is true of
both Spark and in-memory RDBMs. .."

Well an IMDB will not start flushing to disk when it gets full, thus doing
PIO. It won't be able to take any  more data. So is the situation with
Coherence cache or any fabric. It will run out of memory'.

ok but that a bit different. I don't know how Spark works but I know that
hash join was invented for RDBMS when there was no suitable index as you
would pair the build stream with probe stream. Are these not equivalent of
RDDs now? In other words we can assume that one RDD is a build bucket and
the other is probe bucket. Now in RDBMS if there is not enough
buckets/memory available for probe stream then then the whole things starts
spilling to disk. I just notice that spark GUI shows the spills as well?

[image: Inline images 1]


The one under TungstenAgregate->  spill size

Now in that case with spills I am not sure an index is going to do much?

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 May 2016 at 20:06, Michael Segel  wrote:

> I have to clarify something…
>
> In SparkSQL, we can query against both immutable existing RDDs, and
> Hive/HBase/MapRDB/  which are mutable.
> So we have to keep this in mind while we are talking about secondary
> indexing. (Its not just RDDs)
>
>
> I think the only advantage to being immutable is that once you generate
> and index the RDD, its not going to change, so the ‘correctness’ or RI is
> implicit.
> Here, the issue becomes how long will the RDD live. There is a cost to
> generate the index, which has to be weighed against its usefulness and the
> longevity of the underlying RDD. Since the RDD is typically associated to a
> single spark context, building indexes may be cost prohibitive.
>
> At the same time… if you are dealing with a large enough set of data… you
> will have I/O. Both in terms of networking and Physical. This is true of
> both Spark and in-memory RDBMs.  This is due to the footprint of data along
> with the need to persist the data.
>
> But I digress.
>
> So in one scenario, we’re building our RDDs from a system that has
> indexing available.  Is it safe to assume that SparkSQL will take advantage
> of the indexing in the underlying system? (Imagine sourcing data from an
> Oracle or DB2 database in order to build RDDs.) If so, then we don’t have
> to work about indexing.
>
> In another scenario, we’re joining an RDD against a table in an RDBMS. Is
> it safe to assume that Spark will select data from the database in to an
> RDD prior to attempting to do the join?  Here, the RDBMs table will use its
> index when you execute the query? (Again its an assumption…)  Then you have
> two data sets that then need to be joined, which leads to the third
> scenario…
>
> Joining two spark RDDs.
> Going from memory, its a hash join. Here the RDD is used to create a hash
> table which would imply an index   of the hash key.  So for joins, you
> wouldn’t need a secondary index?
> They wouldn’t provide any value due to the hash table being created. (And
> you would probably apply the filter while you inserted a row in to the hash
> table before the join. )
>
> Did I just answer my own question?
>
>
>
> On May 30, 2016, at 10:58 AM, Mich Talebzadeh 
> wrote:
>
> Just a thought
>
> Well in Spark RDDs are immutable which is an advantage compared to a
> conventional IMDB like Oracle TimesTen meaning concurrency is not an issue
> for certain indexes.
>
> The overriding optimisation (as there is no Physical IO) has to be
> reducing memory footprint and CPU demands and using indexes may help for
> full key lookups. if I recall correctly in-memory databases support
> hash-indexes and T-tree indexes which are pretty common in these
> situations. But there is an overhead in creating indexes on RDDS and I
> presume parallelize those indexes.
>
> With regard to getting data into RDD from say an underlying table in Hive
> into a temp table, then depending on the size of that temp table, one can
> debate an index on that temp table.
>
> The question is what use case do you have in mind.?
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 30 May 2016 at 17:08, Michael Segel  wrote:
>
>> I’m not sure where to post this since its a bit of a philosophical
>> question in terms of design and vision for spark.
>>
>> If we look at SparkSQL and performance… where does Secondary indexing 

Re: Spark Streaming heap space out of memory

2016-05-30 Thread Shahbaz
Hi Christian,


   - What is the processing time of each of your Batch,is it exceeding 15
   seconds.
   - How many jobs are queued.
   - Can you take a heap dump and see which objects are occupying the heap.


Regards,
Shahbaz


On Tue, May 31, 2016 at 12:21 AM, christian.dancu...@rbc.com <
christian.dancu...@rbc.com> wrote:

> Hi All,
>
> We have a spark streaming v1.4/java 8 application that slows down and
> eventually runs out of heap space. The less driver memory, the faster it
> happens.
>
> Appended is our spark configuration and a snapshot of the of heap taken
> using jmap on the driver process. The RDDInfo, $colon$colon and [C objects
> keep growing as we observe. We also tried to use G1GC, but it acts the
> same.
>
> Our dependency graph contains multiple updateStateByKey() calls. For each,
> we explicitly set the checkpoint interval to 240 seconds.
>
> We have our batch interval set to 15 seconds; with no delays at the start
> of
> the process.
>
> Spark configuration (Spark Driver Memory: 6GB, Spark Executor Memory: 2GB):
> spark.streaming.minRememberDuration=180s
> spark.ui.showConsoleProgress=false
> spark.streaming.receiver.writeAheadLog.enable=true
> spark.streaming.unpersist=true
> spark.streaming.stopGracefullyOnShutdown=true
> spark.streaming.ui.retainedBatches=10
> spark.ui.retainedJobs=10
> spark.ui.retainedStages=10
> spark.worker.ui.retainedExecutors=10
> spark.worker.ui.retainedDrivers=10
> spark.sql.ui.retainedExecutions=10
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> spark.kryoserializer.buffer.max=128m
>
> num #instances #bytes  class name
> --
>1:   8828200  565004800  org.apache.spark.storage.RDDInfo
>2:  20794893  499077432  scala.collection.immutable.$colon$colon
>3:   9646097  459928736  [C
>4:   9644398  231465552  java.lang.String
>5:  12760625  20417  java.lang.Integer
>6: 21326  98632  [B
>7:556959   44661232  [Lscala.collection.mutable.HashEntry;
>8:   1179788   37753216
> java.util.concurrent.ConcurrentHashMap$Node
>9:   1169264   37416448  java.util.Hashtable$Entry
>   10:552707   30951592  org.apache.spark.scheduler.StageInfo
>   11:367107   23084712  [Ljava.lang.Object;
>   12:556948   22277920  scala.collection.mutable.HashMap
>   13:  2787   22145568
> [Ljava.util.concurrent.ConcurrentHashMap$Node;
>   14:116997   12167688  org.apache.spark.executor.TaskMetrics
>   15:3604258650200
> java.util.concurrent.LinkedBlockingQueue$Node
>   16:3604178650008
> org.apache.spark.deploy.history.yarn.HandleSparkEvent
>   17:  83328478088  [Ljava.util.Hashtable$Entry;
>   18:3510618425464  scala.collection.mutable.ArrayBuffer
>   19:1169638421336  org.apache.spark.scheduler.TaskInfo
>   20:4461367138176  scala.Some
>   21:2119685087232
> io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>   22:1169634678520
> org.apache.spark.scheduler.SparkListenerTaskEnd
>   23:1076794307160
> org.apache.spark.executor.ShuffleWriteMetrics
>   24: 721624041072
> org.apache.spark.executor.ShuffleReadMetrics
>   25:1172233751136  scala.collection.mutable.ListBuffer
>   26: 814733258920  org.apache.spark.executor.InputMetrics
>   27:1259033021672  org.apache.spark.rdd.RDDOperationScope
>   28: 914552926560  java.util.HashMap$Node
>   29:892917776
> [Lscala.concurrent.forkjoin.ForkJoinTask;
>   30:1169572806968
> org.apache.spark.scheduler.SparkListenerTaskStart
>   31:  21222188568  [Lorg.apache.spark.scheduler.StageInfo;
>   32: 164111819816  java.lang.Class
>   33: 878621405792
> org.apache.spark.scheduler.SparkListenerUnpersistRDD
>   34: 22915 916600  org.apache.spark.storage.BlockStatus
>   35:  5887 895568  [Ljava.util.HashMap$Node;
>   36:   480 82
> [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
>   37:  7569 834968  [I
>   38:  9626 770080  org.apache.spark.rdd.MapPartitionsRDD
>   39: 31748 761952  java.lang.Long
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-heap-space-out-of-memory-tp27050.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-05-30 Thread Michael Segel
Mich, 

Most people use vendor releases because they need to have the support. 
Hortonworks is the vendor who has the most skin in the game when it comes to 
Tez. 

If memory serves, Tez isn’t going to be M/R but a local execution engine? Then 
LLAP is the in-memory piece to speed up Tez? 

HTH

-Mike

> On May 29, 2016, at 1:35 PM, Mich Talebzadeh  
> wrote:
> 
> thanks I think the problem is that the TEZ user group is exceptionally quiet. 
> Just sent an email to Hive user group to see anyone has managed to built a 
> vendor independent version.
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 29 May 2016 at 21:23, Jörn Franke  > wrote:
> Well I think it is different from MR. It has some optimizations which you do 
> not find in MR. Especially the LLAP option in Hive2 makes it interesting. 
> 
> I think hive 1.2 works with 0.7 and 2.0 with 0.8 . At least for 1.2 it is 
> integrated in the Hortonworks distribution. 
> 
> 
> On 29 May 2016, at 21:43, Mich Talebzadeh  > wrote:
> 
>> Hi Jorn,
>> 
>> I started building apache-tez-0.8.2 but got few errors. Couple of guys from 
>> TEZ user group kindly gave a hand but I could not go very far (or may be I 
>> did not make enough efforts) making it work.
>> 
>> That TEZ user group is very quiet as well.
>> 
>> My understanding is TEZ is MR with DAG but of course Spark has both plus 
>> in-memory capability.
>> 
>> It would be interesting to see what version of TEZ works as execution engine 
>> with Hive.
>> 
>> Vendors are divided on this (use Hive with TEZ) or use Impala instead of 
>> Hive etc as I am sure you already know.
>> 
>> Cheers,
>> 
>> 
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>>  
>> 
>> On 29 May 2016 at 20:19, Jörn Franke > > wrote:
>> Very interesting do you plan also a test with TEZ?
>> 
>> On 29 May 2016, at 13:40, Mich Talebzadeh > > wrote:
>> 
>>> Hi,
>>> 
>>> I did another study of Hive using Spark engine compared to Hive with MR.
>>> 
>>> Basically took the original table imported using Sqoop and created and 
>>> populated a new ORC table partitioned by year and month into 48 partitions 
>>> as follows:
>>> 
>>> 
>>> ​ 
>>> Connections use JDBC via beeline. Now for each partition using MR it takes 
>>> an average of 17 minutes as seen below for each PARTITION..  Now that is 
>>> just an individual partition and there are 48 partitions. 
>>> 
>>> In contrast doing the same operation with Spark engine took 10 minutes all 
>>> inclusive. I just gave up on MR. You can see the StartTime and FinishTime 
>>> from below
>>> 
>>> 
>>> 
>>> This is by no means indicate that Spark is much better than MR but shows 
>>> that some very good results can ve achieved using Spark engine.
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> 
>>>  
>>> http://talebzadehmich.wordpress.com 
>>>  
>>> 
>>> On 24 May 2016 at 08:03, Mich Talebzadeh >> > wrote:
>>> Hi,
>>> 
>>> We use Hive as the database and use Spark as an all purpose query tool.
>>> 
>>> Whether Hive is the write database for purpose or one is better off with 
>>> something like Phoenix on Hbase, well the answer is it depends and your 
>>> mileage varies. 
>>> 
>>> So fit for purpose.
>>> 
>>> Ideally what wants is to use the fastest  method to get the results. How 
>>> fast we confine it to our SLA agreements in production and that helps us 
>>> from unnecessary further work as we technologists like to play around.
>>> 
>>> So in short, we use Spark most of the time and use Hive as the backend 
>>> engine for data storage, mainly ORC tables.
>>> 
>>> We use Hive on Spark and with Hive 2 on Spark 1.3.1 for now we have a 
>>> combination that works. Granted it helps to use Hive 2 on Spark 1.6.1 but 
>>> at the moment it is one of my projects.
>>> 
>>> We do not use any vendor's products as it enables us to move away  from 
>>> being tied down after years of SAP, Oracle and MS dependency to yet another 
>>> vendor. Besides there is some politics going on 

Re: HiveContext standalone => without a Hive metastore

2016-05-30 Thread Michael Segel
Going from memory… Derby is/was Cloudscape which IBM acquired from Informix who 
bought the company way back when.  (Since IBM released it under Apache 
licensing, Sun Microsystems took it and created JavaDB…) 

I believe that there is a networking function so that you can either bring it 
up in stand alone mode or networking mode that allows simultaneous network 
connections (multi-user). 

If not you can always go MySQL.

HTH

> On May 26, 2016, at 1:36 PM, Mich Talebzadeh  
> wrote:
> 
> Well make sure than you set up a reasonable RDBMS as metastore. Ours is 
> Oracle but you can get away with others. Check the supported list in
> 
> hduser@rhes564:: :/usr/lib/hive/scripts/metastore/upgrade> ltr
> total 40
> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 postgres
> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mysql
> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mssql
> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 derby
> drwxr-xr-x 3 hduser hadoop 4096 May 20 18:44 oracle
> 
> you have few good ones in the list.  In general the base tables (without 
> transactional support) are around 55  (Hive 2) and don't take much space 
> (depending on the volume of tables). I attached a E-R diagram.
> 
> HTH
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 26 May 2016 at 19:09, Gerard Maas  > wrote:
> Thanks a lot for the advice!. 
> 
> I found out why the standalone hiveContext would not work:  it was trying to 
> deploy a derby db and the user had no rights to create the dir where there db 
> is stored:
> 
> Caused by: java.sql.SQLException: Failed to create database 'metastore_db', 
> see the next exception for details.
> 
>at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
> 
>at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>  Source)
> 
>... 129 more
> 
> Caused by: java.sql.SQLException: Directory 
> /usr/share/spark-notebook/metastore_db cannot be created.
> 
> 
> 
> Now, the new issue is that we can't start more than 1 context at the same 
> time. I think we will need to setup a proper metastore.
> 
> 
> 
> -kind regards, Gerard.
> 
> 
> 
> 
> 
> On Thu, May 26, 2016 at 3:06 PM, Mich Talebzadeh  > wrote:
> To use HiveContext witch is basically an sql api within Spark without proper 
> hive set up does not make sense. It is a super set of Spark SQLContext
> 
> In addition simple things like registerTempTable may not work.
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 26 May 2016 at 13:01, Silvio Fiorito  > wrote:
> Hi Gerard,
> 
>  
> 
> I’ve never had an issue using the HiveContext without a hive-site.xml 
> configured. However, one issue you may have is if multiple users are starting 
> the HiveContext from the same path, they’ll all be trying to store the 
> default Derby metastore in the same location. Also, if you want them to be 
> able to persist permanent table metadata for SparkSQL then you’ll want to set 
> up a true metastore.
> 
>  
> 
> The other thing it could be is Hive dependency collisions from the classpath, 
> but that shouldn’t be an issue since you said it’s standalone (not a Hadoop 
> distro right?).
> 
>  
> 
> Thanks,
> 
> Silvio
> 
>  
> 
> From: Gerard Maas >
> Date: Thursday, May 26, 2016 at 5:28 AM
> To: spark users >
> Subject: HiveContext standalone => without a Hive metastore
> 
>  
> 
> Hi,
> 
>  
> 
> I'm helping some folks setting up an analytics cluster with  Spark.
> 
> They want to use the HiveContext to enable the Window functions on 
> DataFrames(*) but they don't have any Hive installation, nor they need one at 
> the moment (if not necessary for this feature)
> 
>  
> 
> When we try to create a Hive context, we get the following error:
> 
>  
> 
> > val sqlContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
> 
> java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate 
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
> 
>at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
> 
>  
> 
> Is my HiveContext failing b/c it wants to connect to an 

Re: Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-30 Thread Nirav Patel
Put is a type of Mutation so not sure what you mean by if I use mutation.

Anyway I registered all 3 classes to kryo.

kryo.register(classOf[org.apache.hadoop.hbase.client.Put])

kryo.register(classOf[ImmutableBytesWritable])

kryo.register(classOf[Mutable])


It still fails with the same exception.



On Sun, May 29, 2016 at 11:26 PM, sjk  wrote:

> org.apache.hadoop.hbase.client.{Mutation, Put}
> org.apache.hadoop.hbase.io.ImmutableBytesWritable
>
> if u used mutation, register the above class too
>
> On May 30, 2016, at 08:11, Nirav Patel  wrote:
>
> Sure let me can try that. But from looks of it it seems kryo kryo.
> util.MapReferenceResolver.getReadObject trying to access incorrect index
> (100)
>
> On Sun, May 29, 2016 at 5:06 PM, Ted Yu  wrote:
>
>> Can you register Put with Kryo ?
>>
>> Thanks
>>
>> On May 29, 2016, at 4:58 PM, Nirav Patel  wrote:
>>
>> I pasted code snipped for that method.
>>
>> here's full def:
>>
>>   def writeRddToHBase2(hbaseRdd: RDD[(ImmutableBytesWritable, Put)],
>> tableName: String) {
>>
>>
>> hbaseRdd.values.foreachPartition{ itr =>
>>
>> val hConf = HBaseConfiguration.create()
>>
>> hConf.setInt("hbase.client.write.buffer", 16097152)
>>
>> val table = new HTable(hConf, tableName)
>>
>> //table.setWriteBufferSize(8388608)
>>
>> *itr.grouped(100).foreach(table.put(_)) *  // << Exception
>> happens at this point
>>
>> table.close()
>>
>> }
>>
>>   }
>>
>>
>> I am using hbase 0.98.12 mapr distribution.
>>
>>
>> Thanks
>>
>> Nirav
>>
>> On Sun, May 29, 2016 at 4:46 PM, Ted Yu  wrote:
>>
>>> bq.  at com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$
>>> anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
>>>
>>> Can you reveal related code from HbaseUtils.scala ?
>>>
>>> Which hbase version are you using ?
>>>
>>> Thanks
>>>
>>> On Sun, May 29, 2016 at 4:26 PM, Nirav Patel 
>>> wrote:
>>>
 Hi,

 I am getting following Kryo deserialization error when trying to
 buklload Cached RDD into Hbase. It works if I don't cache the RDD. I cache
 it with MEMORY_ONLY_SER.

 here's the code snippet:


 hbaseRdd.values.foreachPartition{ itr =>
 val hConf = HBaseConfiguration.create()
 hConf.setInt("hbase.client.write.buffer", 16097152)
 val table = new HTable(hConf, tableName)
 itr.grouped(100).foreach(table.put(_))
 table.close()
 }
 hbaseRdd is of type RDD[(ImmutableBytesWritable, Put)]


 Exception I am getting. I read on Kryo JIRA that this may be issue with
 incorrect use of serialization library. So could this be issue with
 twitter-chill library or spark core it self ?

 Job aborted due to stage failure: Task 16 in stage 9.0 failed 10 times,
 most recent failure: Lost task 16.9 in stage 9.0 (TID 28614,
 hdn10.mycorptcorporation.local): com.esotericsoftware.kryo.KryoException:
 java.lang.IndexOutOfBoundsException: Index: 100, Size: 6
 Serialization trace:
 familyMap (org.apache.hadoop.hbase.client.Put)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
 at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)
 at
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
 at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
 at
 com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:75)
 at
 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
 at
 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
 at
 

Spark Streaming heap space out of memory

2016-05-30 Thread christian.dancu...@rbc.com
Hi All,

We have a spark streaming v1.4/java 8 application that slows down and
eventually runs out of heap space. The less driver memory, the faster it
happens.

Appended is our spark configuration and a snapshot of the of heap taken
using jmap on the driver process. The RDDInfo, $colon$colon and [C objects
keep growing as we observe. We also tried to use G1GC, but it acts the same.

Our dependency graph contains multiple updateStateByKey() calls. For each,
we explicitly set the checkpoint interval to 240 seconds.

We have our batch interval set to 15 seconds; with no delays at the start of
the process.

Spark configuration (Spark Driver Memory: 6GB, Spark Executor Memory: 2GB):
spark.streaming.minRememberDuration=180s
spark.ui.showConsoleProgress=false
spark.streaming.receiver.writeAheadLog.enable=true
spark.streaming.unpersist=true
spark.streaming.stopGracefullyOnShutdown=true
spark.streaming.ui.retainedBatches=10
spark.ui.retainedJobs=10
spark.ui.retainedStages=10
spark.worker.ui.retainedExecutors=10
spark.worker.ui.retainedDrivers=10
spark.sql.ui.retainedExecutions=10
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max=128m

num #instances #bytes  class name
--
   1:   8828200  565004800  org.apache.spark.storage.RDDInfo
   2:  20794893  499077432  scala.collection.immutable.$colon$colon
   3:   9646097  459928736  [C
   4:   9644398  231465552  java.lang.String
   5:  12760625  20417  java.lang.Integer
   6: 21326  98632  [B
   7:556959   44661232  [Lscala.collection.mutable.HashEntry;
   8:   1179788   37753216 
java.util.concurrent.ConcurrentHashMap$Node
   9:   1169264   37416448  java.util.Hashtable$Entry
  10:552707   30951592  org.apache.spark.scheduler.StageInfo
  11:367107   23084712  [Ljava.lang.Object;
  12:556948   22277920  scala.collection.mutable.HashMap
  13:  2787   22145568 
[Ljava.util.concurrent.ConcurrentHashMap$Node;
  14:116997   12167688  org.apache.spark.executor.TaskMetrics
  15:3604258650200 
java.util.concurrent.LinkedBlockingQueue$Node
  16:3604178650008 
org.apache.spark.deploy.history.yarn.HandleSparkEvent
  17:  83328478088  [Ljava.util.Hashtable$Entry;
  18:3510618425464  scala.collection.mutable.ArrayBuffer
  19:1169638421336  org.apache.spark.scheduler.TaskInfo
  20:4461367138176  scala.Some
  21:2119685087232 
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
  22:1169634678520 
org.apache.spark.scheduler.SparkListenerTaskEnd
  23:1076794307160 
org.apache.spark.executor.ShuffleWriteMetrics
  24: 721624041072 
org.apache.spark.executor.ShuffleReadMetrics
  25:1172233751136  scala.collection.mutable.ListBuffer
  26: 814733258920  org.apache.spark.executor.InputMetrics
  27:1259033021672  org.apache.spark.rdd.RDDOperationScope
  28: 914552926560  java.util.HashMap$Node
  29:892917776 
[Lscala.concurrent.forkjoin.ForkJoinTask;
  30:1169572806968 
org.apache.spark.scheduler.SparkListenerTaskStart
  31:  21222188568  [Lorg.apache.spark.scheduler.StageInfo;
  32: 164111819816  java.lang.Class
  33: 878621405792 
org.apache.spark.scheduler.SparkListenerUnpersistRDD
  34: 22915 916600  org.apache.spark.storage.BlockStatus
  35:  5887 895568  [Ljava.util.HashMap$Node;
  36:   480 82 
[Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
  37:  7569 834968  [I
  38:  9626 770080  org.apache.spark.rdd.MapPartitionsRDD
  39: 31748 761952  java.lang.Long




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-heap-space-out-of-memory-tp27050.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does Spark support updates or deletes on underlying Hive tables

2016-05-30 Thread Ashok Kumar
Hi,
I can do inserts from Spark on Hive tables. How about updates or deletes. They 
are failing when I tried?
Thanking



Re: Secondary Indexing?

2016-05-30 Thread Gourav Sengupta
Hi,

have you tried using partitioning and parquet format. It works super fast
in SPARK.


Regards,
Gourav

On Mon, May 30, 2016 at 5:08 PM, Michael Segel 
wrote:

> I’m not sure where to post this since its a bit of a philosophical
> question in terms of design and vision for spark.
>
> If we look at SparkSQL and performance… where does Secondary indexing fit
> in?
>
> The reason this is a bit awkward is that if you view Spark as querying
> RDDs which are temporary, indexing doesn’t make sense until you consider
> your use case and how long is ‘temporary’.
> Then if you consider your RDD result set could be based on querying
> tables… and you could end up with an inverted table as an index… then
> indexing could make sense.
>
> Does it make sense to discuss this in user or dev email lists? Has anyone
> given this any thought in the past?
>
> Thx
>
> -Mike
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Secondary Indexing?

2016-05-30 Thread Mich Talebzadeh
Just a thought

Well in Spark RDDs are immutable which is an advantage compared to a
conventional IMDB like Oracle TimesTen meaning concurrency is not an issue
for certain indexes.

The overriding optimisation (as there is no Physical IO) has to be reducing
memory footprint and CPU demands and using indexes may help for full key
lookups. if I recall correctly in-memory databases support hash-indexes and
T-tree indexes which are pretty common in these situations. But there is an
overhead in creating indexes on RDDS and I presume parallelize those
indexes.

With regard to getting data into RDD from say an underlying table in Hive
into a temp table, then depending on the size of that temp table, one can
debate an index on that temp table.

The question is what use case do you have in mind.?

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 May 2016 at 17:08, Michael Segel  wrote:

> I’m not sure where to post this since its a bit of a philosophical
> question in terms of design and vision for spark.
>
> If we look at SparkSQL and performance… where does Secondary indexing fit
> in?
>
> The reason this is a bit awkward is that if you view Spark as querying
> RDDs which are temporary, indexing doesn’t make sense until you consider
> your use case and how long is ‘temporary’.
> Then if you consider your RDD result set could be based on querying
> tables… and you could end up with an inverted table as an index… then
> indexing could make sense.
>
> Does it make sense to discuss this in user or dev email lists? Has anyone
> given this any thought in the past?
>
> Thx
>
> -Mike
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: can not use udf in hivethriftserver2

2016-05-30 Thread lalit sharma
Can you try adding jar to SPARK_CLASSPATH env variable ?

On Mon, May 30, 2016 at 9:55 PM, 喜之郎 <251922...@qq.com> wrote:

> HI all, I have a problem when using hiveserver2 and beeline.
> when I use CLI mode, the udf works well.
> But when I begin to use hiveserver2 and beeline, the udf can not work.
> My Spark version is 1.5.1.
> I tried 2 methods, first:
> ##
> add jar /home/hadoop/dmp-udf-0.0.1-SNAPSHOT.jar;
> create temporary function URLEncode as "com.dmp.hive.udfs.utils.URLEncode"
> ;
>
> errors:
> Error: org.apache.spark.sql.AnalysisException: undefined function
> URLEncode; line 1 pos 207 (state=,code=0)
>
>
> second:
> create temporary function URLEncode as 'com.dmp.hive.udfs.utils.URLEncode'
> using jar
> 'hdfs:///warehouse/dmpv3.db/datafile/libjars/dmp-udf-0.0.1-SNAPSHOT.jar';
>
> the error is same:
> Error: org.apache.spark.sql.AnalysisException: undefined function
> URLEncode; line 1 pos 207 (state=,code=0)
>
> ###
>
> can anyone give some suggestions? Or how to use udf in hiveserver2/beeline
> mode?
>
>
>


Window Operation on Dstream Fails

2016-05-30 Thread vinay453
Hello, 

I am using 1.6.0 version of Spark and trying to run window operation on
DStreams. 

Window_TwoMin = 4*60*1000
Slide_OneMin = 2*60*1000

census = ssc.textFileStream("./census_stream/").filter(lambda a:
a.startswith('-') == False).map(lambda b: b.split("\t")) .map(lambda c:
(c[0],c[2],c[3],epic_dtm_to_str(c[4])))

census_window = census.window(Window_TwoMin,Slide_OneMin)

However i get error messages:

INFO dstream.WindowedDStream: Time 146462790 ms is invalid as zeroTime
is 146462784 ms and slideDuration is 12000 ms and difference is
6 ms

I have seen other people getting same error messages.  Has it been fixed in
1.6.1?

Thanks, 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Window-Operation-on-Dstream-Fails-tp27049.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



can not use udf in hivethriftserver2

2016-05-30 Thread ??????
HI all, I have a problem when using hiveserver2 and beeline.
when I use CLI mode, the udf works well.
But when I begin to use hiveserver2 and beeline, the udf can not work.
My Spark version is 1.5.1.
I tried 2 methods, first:
##
add jar /home/hadoop/dmp-udf-0.0.1-SNAPSHOT.jar;
create temporary function URLEncode as "com.dmp.hive.udfs.utils.URLEncode" ;


errors:
Error: org.apache.spark.sql.AnalysisException: undefined function URLEncode; 
line 1 pos 207 (state=,code=0)




second:
create temporary function URLEncode as 'com.dmp.hive.udfs.utils.URLEncode' 
using jar 
'hdfs:///warehouse/dmpv3.db/datafile/libjars/dmp-udf-0.0.1-SNAPSHOT.jar';


the error is same:
Error: org.apache.spark.sql.AnalysisException: undefined function URLEncode; 
line 1 pos 207 (state=,code=0)


###


can anyone give some suggestions? Or how to use udf in hiveserver2/beeline mode?

Secondary Indexing?

2016-05-30 Thread Michael Segel
I’m not sure where to post this since its a bit of a philosophical question in 
terms of design and vision for spark. 

If we look at SparkSQL and performance… where does Secondary indexing fit in? 

The reason this is a bit awkward is that if you view Spark as querying RDDs 
which are temporary, indexing doesn’t make sense until you consider your use 
case and how long is ‘temporary’.
Then if you consider your RDD result set could be based on querying tables… and 
you could end up with an inverted table as an index… then indexing could make 
sense. 

Does it make sense to discuss this in user or dev email lists? Has anyone given 
this any thought in the past? 

Thx

-Mike


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX Java API

2016-05-30 Thread Michael Malak
Yes, it is possible to use GraphX from Java but it requires 10x the amount of 
code and involves using obscure typing and pre-defined lambda prototype 
facilities. I give an example of it in my book, the source code for which can 
be downloaded for free from 
https://www.manning.com/books/spark-graphx-in-action The relevant example is 
EdgeCount.java in chapter 10.
As I suggest in my book, likely the only reason you'd want to put yourself 
through that torture is corporate mandate or compatibility with Java bytecode 
tools.

  From: Sean Owen 
 To: Takeshi Yamamuro ; "Kumar, Abhishek (US - 
Bengaluru)"  
Cc: "user@spark.apache.org" 
 Sent: Monday, May 30, 2016 7:07 AM
 Subject: Re: GraphX Java API
   
No, you can call any Scala API in Java. It is somewhat less convenient if the 
method was not written with Java in mind but does work. 

On Mon, May 30, 2016, 00:32 Takeshi Yamamuro  wrote:

These package are used only for Scala.
On Mon, May 30, 2016 at 2:23 PM, Kumar, Abhishek (US - Bengaluru) 
 wrote:

Hey,·  I see some graphx packages listed 
here:http://spark.apache.org/docs/latest/api/java/index.html·  
org.apache.spark.graphx·  org.apache.spark.graphx.impl·  
org.apache.spark.graphx.lib·  org.apache.spark.graphx.utilAren’t they meant 
to be used with JAVA?Thanks From: Santoshakhilesh 
[mailto:santosh.akhil...@huawei.com]
Sent: Friday, May 27, 2016 4:52 PM
To: Kumar, Abhishek (US - Bengaluru) ; 
user@spark.apache.org
Subject: RE: GraphX Java API GraphX APis are available only in Scala. If you 
need to use GraphX you need to switch to Scala. From: Kumar, Abhishek (US - 
Bengaluru) [mailto:abhishekkuma...@deloitte.com]
Sent: 27 May 2016 19:59
To: user@spark.apache.org
Subject: GraphX Java API Hi, We are trying to consume the Java API for GraphX, 
but there is no documentation available online on the usage or examples. It 
would be great if we could get some examples in Java. Thanks and regards, 
Abhishek Kumar   This message (including any attachments) contains confidential 
information intended for a specific individual and purpose, and is protected by 
law. If you are not the intended recipient, you should delete this message and 
any disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.v.E.1



-- 
---
Takeshi Yamamuro



  

Re: Bug of PolynomialExpansion ?

2016-05-30 Thread Sean Owen
The 2-degree expansion of (x,y,z) is, in this implementation:

(x, x^2, y, xy, y^2, z, xz, yz, z^2)

Given your input is (1,0,1), the output (1,1,0,0,0,1,1,0,1) is right.


On Mon, May 30, 2016 at 12:37 AM, Jeff Zhang  wrote:
> I use PolynomialExpansion to convert one vector to 2-degree vector. I am
> confused about the result of following. As my understanding, the 2 degrees
> vector should contain 4 1's, not sure how the 5 1's come from. I think it
> supposed to be (x1,x2,x3) *(x1,x2,x3) = (x1*x1, x1*x2, x1*x3, x2*x1,x2*x2,
> x2*x3, x3*x1, x3*x2,x3*x3)
>
> (3,[0,2],[1.0,1.0])  -->
> (9,[0,1,5,6,8],[1.0,1.0,1.0,1.0,1.0])|
>
>
> --
> Best Regards
>
> Jeff Zhang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Yanbo Liang
Yes, you are right.

2016-05-30 2:34 GMT-07:00 Abhishek Anand :

>
> Thanks Yanbo.
>
> So, you mean that if I have a variable which is of type double but I want
> to treat it like String in my model I just have to cast those columns into
> string and simply run the glm model. String columns will be directly
> one-hot encoded by the glm provided by sparkR ?
>
> Just wanted to clarify as in R we need to apply as.factor for categorical
> variables.
>
> val dfNew = df.withColumn("C0",df.col("C0").cast("String"))
>
>
> Abhi !!
>
> On Mon, May 30, 2016 at 2:58 PM, Yanbo Liang  wrote:
>
>> Hi Abhi,
>>
>> In SparkR glm, category features (columns of type string) will be one-hot
>> encoded automatically.
>> So pre-processing like `as.factor` is not necessary, you can directly
>> feed your data to the model training.
>>
>> Thanks
>> Yanbo
>>
>> 2016-05-30 2:06 GMT-07:00 Abhishek Anand :
>>
>>> Hi ,
>>>
>>> I want to run glm variant of sparkR for my data that is there in a csv
>>> file.
>>>
>>> I see that the glm function in sparkR takes a spark dataframe as input.
>>>
>>> Now, when I read a file from csv and create a spark dataframe, how could
>>> I take care of the factor variables/columns in my data ?
>>>
>>> Do I need to convert it to a R dataframe, convert to factor using
>>> as.factor and create spark dataframe and run glm over it ?
>>>
>>> But, running as.factor over big dataset is not possible.
>>>
>>> Please suggest what is the best way to acheive this ?
>>>
>>> What pre-processing should be done, and what is the best way to achieve
>>> it  ?
>>>
>>>
>>> Thanks,
>>> Abhi
>>>
>>
>>
>


Re: GraphX Java API

2016-05-30 Thread Sean Owen
No, you can call any Scala API in Java. It is somewhat less convenient if
the method was not written with Java in mind but does work.

On Mon, May 30, 2016, 00:32 Takeshi Yamamuro  wrote:

> These package are used only for Scala.
>
> On Mon, May 30, 2016 at 2:23 PM, Kumar, Abhishek (US - Bengaluru) <
> abhishekkuma...@deloitte.com> wrote:
>
>> Hey,
>>
>> ·   I see some graphx packages listed here:
>>
>> http://spark.apache.org/docs/latest/api/java/index.html
>>
>> ·   org.apache.spark.graphx
>> 
>>
>> ·   org.apache.spark.graphx.impl
>> 
>>
>> ·   org.apache.spark.graphx.lib
>> 
>>
>> ·   org.apache.spark.graphx.util
>> 
>>
>> Aren’t they meant to be used with JAVA?
>>
>> Thanks
>>
>>
>>
>> *From:* Santoshakhilesh [mailto:santosh.akhil...@huawei.com]
>> *Sent:* Friday, May 27, 2016 4:52 PM
>> *To:* Kumar, Abhishek (US - Bengaluru) ;
>> user@spark.apache.org
>> *Subject:* RE: GraphX Java API
>>
>>
>>
>> GraphX APis are available only in Scala. If you need to use GraphX you
>> need to switch to Scala.
>>
>>
>>
>> *From:* Kumar, Abhishek (US - Bengaluru) [
>> mailto:abhishekkuma...@deloitte.com ]
>> *Sent:* 27 May 2016 19:59
>> *To:* user@spark.apache.org
>> *Subject:* GraphX Java API
>>
>>
>>
>> Hi,
>>
>>
>>
>> We are trying to consume the Java API for GraphX, but there is no
>> documentation available online on the usage or examples. It would be great
>> if we could get some examples in Java.
>>
>>
>>
>> Thanks and regards,
>>
>>
>>
>> *Abhishek Kumar*
>>
>>
>>
>>
>>
>>
>>
>> This message (including any attachments) contains confidential
>> information intended for a specific individual and purpose, and is
>> protected by law. If you are not the intended recipient, you should delete
>> this message and any disclosure, copying, or distribution of this message,
>> or the taking of any action based on it, by you is strictly prohibited.
>>
>> v.E.1
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: FAILED_TO_UNCOMPRESS Error - Spark 1.3.1

2016-05-30 Thread Takeshi Yamamuro
Hi,

This is a known issue.
You need to check a related JIRA ticket:
https://issues.apache.org/jira/browse/SPARK-4105

// maropu

On Mon, May 30, 2016 at 7:51 PM, Prashant Singh Thakur <
prashant.tha...@impetus.co.in> wrote:

> Hi,
>
>
>
> We are trying to use Spark Data Frames for our use case where we are
> getting this exception.
>
> The parameters used are listed below. Kindly suggest if we are missing
> something.
>
> Version used is Spark 1.3.1
>
> Jira is still showing this issue as Open
> https://issues.apache.org/jira/browse/SPARK-4105
>
> Kindly suggest if there is workaround .
>
>
>
> Exception :
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 88 in stage 40.0 failed 4 times, most recent failure: Lost
> task 88.3 in stage 40.0 : java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>
>   at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
>
>   at org.xerial.snappy.SnappyNative.rawUncompress(Native
> Method)
>
>   at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391)
>
>   at org.xerial.snappy.Snappy.uncompress(Snappy.java:427)
>
>   at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
>
>   at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
>
>   at
> org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
>
>   at
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$7.apply(TorrentBroadcast.scala:213)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$7.apply(TorrentBroadcast.scala:213)
>
>   at scala.Option.map(Option.scala:145)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:213)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
>
>   at
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>
>   at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>
>   at
> org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>   at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
>   at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
> Parameters Changed :
>
> spark.akka.frameSize=50
>
> spark.shuffle.memoryFraction=0.4
>
> spark.storage.memoryFraction=0.5
>
> spark.worker.timeout=12
>
> spark.storage.blockManagerSlaveTimeoutMs=12
>
> spark.akka.heartbeat.pauses=6000
>
> spark.akka.heartbeat.interval=1000
>
> spark.ui.port=21000
>
> spark.port.maxRetries=50
>
> spark.executor.memory=10G
>
> spark.executor.instances=100
>
> spark.driver.memory=8G
>
> spark.executor.cores=2
>
> spark.shuffle.compress=true
>
> spark.io.compression.codec=snappy
>
> spark.broadcast.compress=true
>
> spark.rdd.compress=true
>
> spark.worker.cleanup.enabled=true
>
> spark.worker.cleanup.interval=600
>
> spark.worker.cleanup.appDataTtl=600
>
> spark.shuffle.consolidateFiles=true
>
> spark.yarn.preserve.staging.files=false
>
> spark.yarn.driver.memoryOverhead=1024
>
> spark.yarn.executor.memoryOverhead=1024
>
>
>
> Best Regards,
>
> Prashant Singh Thakur
>
> Mobile: +91-9740266522
>
>
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>



-- 
---
Takeshi Yamamuro


Re: Can we use existing R model in Spark

2016-05-30 Thread Neha Mehta
Thanks Sujeet.. will try it out.

Hi Sun,

Can you please tell me what did you mean by "Maybe you can try using the
existing random forest model" ? did you mean creating the model again using
Spark MLLIB?

Thanks,
Neha




> From: sujeet jog 
> Date: Mon, May 30, 2016 at 4:52 PM
> Subject: Re: Can we use existing R model in Spark
> To: Sun Rui 
> Cc: Neha Mehta , user 
>
>
> Try to invoke a R script from Spark using rdd pipe method , get the work
> done & and receive the model back in RDD.
>
>
> for ex :-
> .   rdd.pipe("")
>
>
> On Mon, May 30, 2016 at 3:57 PM, Sun Rui  wrote:
>
>> Unfortunately no. Spark does not support loading external modes (for
>> examples, PMML) for now.
>> Maybe you can try using the existing random forest model in Spark.
>>
>> On May 30, 2016, at 18:21, Neha Mehta  wrote:
>>
>> Hi,
>>
>> I have an existing random forest model created using R. I want to use
>> that to predict values on Spark. Is it possible to do the same? If yes,
>> then how?
>>
>> Thanks & Regards,
>> Neha
>>
>>
>>
>
>


Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams

2016-05-30 Thread nguyen duc tuan
How about this ?

def extract_feature(rf_model, x):
text = getFeatures(x).split(',')
fea = [float(i) for i in text]
prediction = rf_model.predict(fea)
return (prediction, x)
output = texts.map(lambda x: extract_feature(rf_model, x))

2016-05-30 14:17 GMT+07:00 obaidul karim :

> Hi,
>
> Anybody has any idea on below?
>
> -Obaid
>
>
> On Friday, 27 May 2016, obaidul karim  wrote:
>
>> Hi Guys,
>>
>> This is my first mail to spark users mailing list.
>>
>> I need help on Dstream operation.
>>
>> In fact, I am using a MLlib randomforest model to predict using spark
>> streaming. In the end, I want to combine the feature Dstream & prediction
>> Dstream together for further downstream processing.
>>
>> I am predicting using below piece of code:
>>
>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x :
>> x.split(',')).map( lambda parts : [float(i) for i in parts]
>> ).transform(lambda rdd: rf_model.predict(rdd))
>>
>> Here texts is dstream having single line of text as records
>> getFeatures generates a comma separated features extracted from each
>> record
>>
>>
>> I want the output as below tuple:
>> ("predicted value", "original text")
>>
>> How can I achieve that ?
>> or
>> at least can I perform .zip like normal RDD operation on two Dstreams,
>> like below:
>> output = texts.zip(predictions)
>>
>>
>> Thanks in advance.
>>
>> -Obaid
>>
>


Re: Can we use existing R model in Spark

2016-05-30 Thread sujeet jog
Try to invoke a R script from Spark using rdd pipe method , get the work
done & and receive the model back in RDD.


for ex :-
.   rdd.pipe("")


On Mon, May 30, 2016 at 3:57 PM, Sun Rui  wrote:

> Unfortunately no. Spark does not support loading external modes (for
> examples, PMML) for now.
> Maybe you can try using the existing random forest model in Spark.
>
> On May 30, 2016, at 18:21, Neha Mehta  wrote:
>
> Hi,
>
> I have an existing random forest model created using R. I want to use that
> to predict values on Spark. Is it possible to do the same? If yes, then how?
>
> Thanks & Regards,
> Neha
>
>
>


Re: JDBC Cluster

2016-05-30 Thread Mich Talebzadeh
when you start master it stats applicationmaster. it does not
slaves/workers!

you need to start slaves with start-slaves.sh

slaves will look at the file $SPARK_HOME/conf/slaves to get a list of nodes
to start slaves. then it will start slaves/workers in each node. you can
see all this in spark GUI

[image: Inline images 1]

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 May 2016 at 10:15, Ian  wrote:

> Normally, when you start the master, the slaves should also be started
> automatically. This, however, presupposes that you've configured the
> slaves.
> In the $SPARK_HOME/conf directory there should be a slaves or
> slaves.template file. If it only contains localhost, then you have not set
> up any worker nodes.
>
> Also note that SSH from the master to the slaves must be enabled for the
> user that runs the Thrift server.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/JDBC-Cluster-tp27012p27046.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


FAILED_TO_UNCOMPRESS Error - Spark 1.3.1

2016-05-30 Thread Prashant Singh Thakur
Hi,

We are trying to use Spark Data Frames for our use case where we are getting 
this exception.
The parameters used are listed below. Kindly suggest if we are missing 
something.
Version used is Spark 1.3.1
Jira is still showing this issue as Open 
https://issues.apache.org/jira/browse/SPARK-4105
Kindly suggest if there is workaround .

Exception :
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 88 in stage 40.0 failed 4 times, most recent failure: Lost task 88.3 in 
stage 40.0 : java.io.IOException: FAILED_TO_UNCOMPRESS(5)
  at 
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
  at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
  at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391)
  at org.xerial.snappy.Snappy.uncompress(Snappy.java:427)
  at 
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
  at 
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
  at 
org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
  at 
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
  at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$7.apply(TorrentBroadcast.scala:213)
  at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$7.apply(TorrentBroadcast.scala:213)
  at scala.Option.map(Option.scala:145)
  at 
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:213)
  at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
  at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
  at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
  at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
  at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
  at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)

Parameters Changed :
spark.akka.frameSize=50
spark.shuffle.memoryFraction=0.4
spark.storage.memoryFraction=0.5
spark.worker.timeout=12
spark.storage.blockManagerSlaveTimeoutMs=12
spark.akka.heartbeat.pauses=6000
spark.akka.heartbeat.interval=1000
spark.ui.port=21000
spark.port.maxRetries=50
spark.executor.memory=10G
spark.executor.instances=100
spark.driver.memory=8G
spark.executor.cores=2
spark.shuffle.compress=true
spark.io.compression.codec=snappy
spark.broadcast.compress=true
spark.rdd.compress=true
spark.worker.cleanup.enabled=true
spark.worker.cleanup.interval=600
spark.worker.cleanup.appDataTtl=600
spark.shuffle.consolidateFiles=true
spark.yarn.preserve.staging.files=false
spark.yarn.driver.memoryOverhead=1024
spark.yarn.executor.memoryOverhead=1024

Best Regards,
Prashant Singh Thakur
Mobile: +91-9740266522









NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


DAG of Spark Sort application spanning two jobs

2016-05-30 Thread alvarobrandon
I've written a very simple Sort scala program with Spark.

/object Sort {

def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: Sort  " +
" []")
System.exit(1)
}


val conf = new SparkConf().setAppName("BigDataBench Sort")
val spark = new SparkContext(conf)
val logger = new JobPropertiesLogger(spark,"/home/abrandon/log.csv")
val filename = args(0)
val save_file = args(1)
var splits = spark.defaultMinPartitions
if (args.length > 2){
splits = args(2).toInt
}
val lines = spark.textFile(filename, splits)
logger.start_timer()
val data_map = lines.map(line => {
(line, 1)
})

val result = data_map.sortByKey().map { line => line._1}
logger.stop_timer()
logger.write_log("Sort By Key: Sort App")
result.saveAsTextFile(save_file)

println("Result has been saved to: " + save_file)
}

}/


Now, I was thinking that since there is only one wide transformation
("sortByKey") two stages will be spanned. However I see two jobs with one
stage in Job 0 and two stages for Job 1. Am I missing something?. What I
don't get is the first stage of the second job. it seems to do the same job
as the stage of Job 0.

 
 
 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DAG-of-Spark-Sort-application-spanning-two-jobs-tp27047.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can we use existing R model in Spark

2016-05-30 Thread Sun Rui
Unfortunately no. Spark does not support loading external modes (for examples, 
PMML) for now.
Maybe you can try using the existing random forest model in Spark.

> On May 30, 2016, at 18:21, Neha Mehta  wrote:
> 
> Hi,
> 
> I have an existing random forest model created using R. I want to use that to 
> predict values on Spark. Is it possible to do the same? If yes, then how?
> 
> Thanks & Regards,
> Neha
> 



Re: HiveContext standalone => without a Hive metastore

2016-05-30 Thread Gerard Maas
Michael,  Mitch, Silvio,

Thanks!

The own directoy is the issue. We are running the Spark Notebook, which
uses the same dir per server (i.e. for all notebooks). So this issue
prevents us from running 2 notebooks using HiveContext.
I'll look in a proper Hive installation and I'm glad to know that this
dependency is gone in 2.0
Look forward to 2.1 :-) ;-)

-kr, Gerard.


On Thu, May 26, 2016 at 10:55 PM, Michael Armbrust 
wrote:

> You can also just make sure that each user is using their own directory.
> A rough example can be found in TestHive.
>
> Note: in Spark 2.0 there should be no need to use HiveContext unless you
> need to talk to a metastore.
>
> On Thu, May 26, 2016 at 1:36 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Well make sure than you set up a reasonable RDBMS as metastore. Ours is
>> Oracle but you can get away with others. Check the supported list in
>>
>> hduser@rhes564:: :/usr/lib/hive/scripts/metastore/upgrade> ltr
>> total 40
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 postgres
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mysql
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mssql
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 derby
>> drwxr-xr-x 3 hduser hadoop 4096 May 20 18:44 oracle
>>
>> you have few good ones in the list.  In general the base tables (without
>> transactional support) are around 55  (Hive 2) and don't take much space
>> (depending on the volume of tables). I attached a E-R diagram.
>>
>> HTH
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 26 May 2016 at 19:09, Gerard Maas  wrote:
>>
>>> Thanks a lot for the advice!.
>>>
>>> I found out why the standalone hiveContext would not work:  it was
>>> trying to deploy a derby db and the user had no rights to create the dir
>>> where there db is stored:
>>>
>>> Caused by: java.sql.SQLException: Failed to create database
>>> 'metastore_db', see the next exception for details.
>>>
>>>at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>>> Source)
>>>
>>>at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>>> Source)
>>>
>>>... 129 more
>>>
>>> Caused by: java.sql.SQLException: Directory
>>> /usr/share/spark-notebook/metastore_db cannot be created.
>>>
>>>
>>> Now, the new issue is that we can't start more than 1 context at the
>>> same time. I think we will need to setup a proper metastore.
>>>
>>>
>>> -kind regards, Gerard.
>>>
>>>
>>>
>>>
>>> On Thu, May 26, 2016 at 3:06 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 To use HiveContext witch is basically an sql api within Spark without
 proper hive set up does not make sense. It is a super set of Spark
 SQLContext

 In addition simple things like registerTempTable may not work.

 HTH

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 26 May 2016 at 13:01, Silvio Fiorito 
 wrote:

> Hi Gerard,
>
>
>
> I’ve never had an issue using the HiveContext without a hive-site.xml
> configured. However, one issue you may have is if multiple users are
> starting the HiveContext from the same path, they’ll all be trying to 
> store
> the default Derby metastore in the same location. Also, if you want them 
> to
> be able to persist permanent table metadata for SparkSQL then you’ll want
> to set up a true metastore.
>
>
>
> The other thing it could be is Hive dependency collisions from the
> classpath, but that shouldn’t be an issue since you said it’s standalone
> (not a Hadoop distro right?).
>
>
>
> Thanks,
>
> Silvio
>
>
>
> *From: *Gerard Maas 
> *Date: *Thursday, May 26, 2016 at 5:28 AM
> *To: *spark users 
> *Subject: *HiveContext standalone => without a Hive metastore
>
>
>
> Hi,
>
>
>
> I'm helping some folks setting up an analytics cluster with  Spark.
>
> They want to use the HiveContext to enable the Window functions on
> DataFrames(*) but they don't have any Hive installation, nor they need one
> at the moment (if not necessary for this feature)
>
>
>
> When we try to create a Hive context, we get the following error:
>
>
>
> > val sqlContext = new
> 

Can we use existing R model in Spark

2016-05-30 Thread Neha Mehta
Hi,

I have an existing random forest model created using R. I want to use that
to predict values on Spark. Is it possible to do the same? If yes, then how?

Thanks & Regards,
Neha


Re: Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Abhishek Anand
Thanks Yanbo.

So, you mean that if I have a variable which is of type double but I want
to treat it like String in my model I just have to cast those columns into
string and simply run the glm model. String columns will be directly
one-hot encoded by the glm provided by sparkR ?

Just wanted to clarify as in R we need to apply as.factor for categorical
variables.

val dfNew = df.withColumn("C0",df.col("C0").cast("String"))


Abhi !!

On Mon, May 30, 2016 at 2:58 PM, Yanbo Liang  wrote:

> Hi Abhi,
>
> In SparkR glm, category features (columns of type string) will be one-hot
> encoded automatically.
> So pre-processing like `as.factor` is not necessary, you can directly feed
> your data to the model training.
>
> Thanks
> Yanbo
>
> 2016-05-30 2:06 GMT-07:00 Abhishek Anand :
>
>> Hi ,
>>
>> I want to run glm variant of sparkR for my data that is there in a csv
>> file.
>>
>> I see that the glm function in sparkR takes a spark dataframe as input.
>>
>> Now, when I read a file from csv and create a spark dataframe, how could
>> I take care of the factor variables/columns in my data ?
>>
>> Do I need to convert it to a R dataframe, convert to factor using
>> as.factor and create spark dataframe and run glm over it ?
>>
>> But, running as.factor over big dataset is not possible.
>>
>> Please suggest what is the best way to acheive this ?
>>
>> What pre-processing should be done, and what is the best way to achieve
>> it  ?
>>
>>
>> Thanks,
>> Abhi
>>
>
>


Re: Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Yanbo Liang
Hi Abhi,

In SparkR glm, category features (columns of type string) will be one-hot
encoded automatically.
So pre-processing like `as.factor` is not necessary, you can directly feed
your data to the model training.

Thanks
Yanbo

2016-05-30 2:06 GMT-07:00 Abhishek Anand :

> Hi ,
>
> I want to run glm variant of sparkR for my data that is there in a csv
> file.
>
> I see that the glm function in sparkR takes a spark dataframe as input.
>
> Now, when I read a file from csv and create a spark dataframe, how could I
> take care of the factor variables/columns in my data ?
>
> Do I need to convert it to a R dataframe, convert to factor using
> as.factor and create spark dataframe and run glm over it ?
>
> But, running as.factor over big dataset is not possible.
>
> Please suggest what is the best way to acheive this ?
>
> What pre-processing should be done, and what is the best way to achieve it
>  ?
>
>
> Thanks,
> Abhi
>


Re: Launch Spark shell using differnt python version

2016-05-30 Thread Eike von Seggern
Hi Stuti

2016-03-15 10:08 GMT+01:00 Stuti Awasthi :

> Thanks Prabhu,
>
> I tried starting in local mode but still picking Python 2.6 only. I have
> exported “DEFAULT_PYTHON” in my session variable and also included in PATH.
>
>
>
> Export:
>
> export DEFAULT_PYTHON="/home/stuti/Python/bin/python2.7"
>
> export PATH="/home/stuti/Python/bin/python2.7:$PATH
>

DEFAULT_PYTHON is overwritten in pyspark. Have you tried setting
PYSPARK_PYTHON to your Python executable like

$ export PYSPARK_PYTHON="/home/stuti/Python/bin/python2.7"

If that's not working, what's the output of

$ printenv | sort

and in pyspark:

>>> import os
>>> for k, v in sorted(os.environ.items()):
...print k, v

?

Best

Eike


Re: List of questios about spark

2016-05-30 Thread Ian
No, the limit is given by your setup. If you use Spark on a YARN cluster,
then the number of concurrent jobs is really limited to the resources
allocated to each job and how the YARN queues are set up. For instance, if
you use the FIFO scheduler (default), then it can be the case that the first
job takes up all the resources and all the others have to wait until the job
is done. If, on the other hand, you use the FAIR scheduler, then the number
of jobs that run concurrently is limited just by what's available on the
cluster in terms of resources.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/List-of-questios-about-spark-tp27027p27045.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Abhishek Anand
Hi ,

I want to run glm variant of sparkR for my data that is there in a csv file.

I see that the glm function in sparkR takes a spark dataframe as input.

Now, when I read a file from csv and create a spark dataframe, how could I
take care of the factor variables/columns in my data ?

Do I need to convert it to a R dataframe, convert to factor using as.factor
and create spark dataframe and run glm over it ?

But, running as.factor over big dataset is not possible.

Please suggest what is the best way to acheive this ?

What pre-processing should be done, and what is the best way to achieve it
 ?


Thanks,
Abhi


Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams

2016-05-30 Thread obaidul karim
Hi,

Anybody has any idea on below?

-Obaid

On Friday, 27 May 2016, obaidul karim  wrote:

> Hi Guys,
>
> This is my first mail to spark users mailing list.
>
> I need help on Dstream operation.
>
> In fact, I am using a MLlib randomforest model to predict using spark
> streaming. In the end, I want to combine the feature Dstream & prediction
> Dstream together for further downstream processing.
>
> I am predicting using below piece of code:
>
> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x :
> x.split(',')).map( lambda parts : [float(i) for i in parts]
> ).transform(lambda rdd: rf_model.predict(rdd))
>
> Here texts is dstream having single line of text as records
> getFeatures generates a comma separated features extracted from each record
>
>
> I want the output as below tuple:
> ("predicted value", "original text")
>
> How can I achieve that ?
> or
> at least can I perform .zip like normal RDD operation on two Dstreams,
> like below:
> output = texts.zip(predictions)
>
>
> Thanks in advance.
>
> -Obaid
>


RE: Query related to spark cluster

2016-05-30 Thread Kumar, Saurabh 5. (Nokia - IN/Bangalore)
Hi All,

@Deepak: Thanks for your suggestion, we are using Mesos to handle spark cluster.

@Jorn : the reason we chose postgresXL was of its geo-spational support as we 
store location data.

We were seeing how to quickly put things better and what is the right approach

Our original thinking was to use different cluster for different needs.

Eg.  Instead of 1 cluster we were thinking having 3 cluster

1) Spark cluster -- including HDFS we need HDFS because we have to read data 
from an SFTP location and we thought best is if we write it first to HDFS

2) Distributed R cluster since R does not scale and we have a need for scaling 
and no time to move to SparkR we thought we try distributed R.

3) PostgresXL cluster -- This is the DB cluster so the Spark cluster would 
write to PostgresXl cluster and R will read/write to postgresXL cluster

In current setup we have included all component into same cluster. Can you 
please help me out to choose best approach which will not compromise 
scalability and failover mechanism?


Regards,
Saurabh



From: Deepak Sharma [mailto:deepakmc...@gmail.com]
Sent: Monday, May 30, 2016 12:17 PM
To: Jörn Franke 
Cc: Kumar, Saurabh 5. (Nokia - IN/Bangalore) ; 
user@spark.apache.org; Sawhney, Prerna (Nokia - IN/Bangalore) 

Subject: Re: Query related to spark cluster

Hi Saurabh
You can have hadoop cluster running YARN as scheduler.
Configure spark to run with the same YARN setup.
Then you need R only on 1 node , and connect to the cluster using the SparkR.

Thanks
Deepak

On Mon, May 30, 2016 at 12:12 PM, Jörn Franke 
> wrote:

Well if you require R then you need to install it (including all additional 
packages) on each node. I am not sure why you store the data in Postgres . 
Storing it in Parquet and Orc is sufficient in HDFS (sorted on relevant 
columns) and you use the SparkR libraries to access them.

On 30 May 2016, at 08:38, Kumar, Saurabh 5. (Nokia - IN/Bangalore) 
> wrote:
Hi Team,

I am using Apache spark to build scalable Analytic engine. My setup is as 
follows.

Flow of processing is as follows:

Raw Files > Store to HDFS > Process by Spark and Store to Postgre_XL Database > 
R process data fom Postgre-XL to process in distributed mode.

I have 6 nodes cluster setup for ETL operations which have

1.  Spark slaves installed on all 6 of them.
2.  HDFS data nodes on each of 6 nodes with replication factor 2.
3.  PosGRE –XL 9.5 Database coordinator on each of 6 nodes.
4.  R software is installed on all nodes and Uses process Data from 
Postgre-XL in distributed manner.




Can you please guide me about pros and cons of this setup.
Installing all component on every machines is recommended or there is any 
drawback?
R software should run on spark cluster ?



Thanks & Regards
Saurabh Kumar
R Engineer, T TED Technology Explorat
Nokia Networks
L5, Manyata Embassy Business Park, Nagavara, Bangalore, India 560045
Mobile: +91-8861012418
http://networks.nokia.com/






--
Thanks
Deepak
www.bigdatabig.com
www.keosha.net

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Query related to spark cluster

2016-05-30 Thread Deepak Sharma
Hi Saurabh
You can have hadoop cluster running YARN as scheduler.
Configure spark to run with the same YARN setup.
Then you need R only on 1 node , and connect to the cluster using the
SparkR.

Thanks
Deepak

On Mon, May 30, 2016 at 12:12 PM, Jörn Franke  wrote:

>
> Well if you require R then you need to install it (including all
> additional packages) on each node. I am not sure why you store the data in
> Postgres . Storing it in Parquet and Orc is sufficient in HDFS (sorted on
> relevant columns) and you use the SparkR libraries to access them.
>
> On 30 May 2016, at 08:38, Kumar, Saurabh 5. (Nokia - IN/Bangalore) <
> saurabh.5.ku...@nokia.com> wrote:
>
> Hi Team,
>
> I am using Apache spark to build scalable Analytic engine. My setup is as
> follows.
>
> Flow of processing is as follows:
>
> Raw Files > Store to HDFS > Process by Spark and Store to Postgre_XL
> Database > R process data fom Postgre-XL to process in distributed mode.
>
> I have 6 nodes cluster setup for ETL operations which have
>
>
>1. Spark slaves installed on all 6 of them.
>2. HDFS data nodes on each of 6 nodes with replication factor 2.
>3. PosGRE –XL 9.5 Database coordinator on each of 6 nodes.
>4. R software is installed on all nodes and Uses process Data from
>Postgre-XL in distributed manner.
>
>
>
>
>
> Can you please guide me about pros and cons of this setup.
> Installing all component on every machines is recommended or there is any
> drawback?
> R software should run on spark cluster ?
>
>
>
> Thanks & Regards
> Saurabh Kumar
> R Engineer, T TED Technology Explorat
> Nokia Networks
> L5, Manyata Embassy Business Park, Nagavara, Bangalore, India 560045
> Mobile: +91-8861012418
> http://networks.nokia.com/
>
>
>
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


RE: Query related to spark cluster

2016-05-30 Thread Kumar, Saurabh 5. (Nokia - IN/Bangalore)
H Jorn,

Thanks for suggestion.

My current cluster setup is mentioned in attached snapshot .Apart from PotgreXL 
do you see any problem over there?


Regards,
Saurabh

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Monday, May 30, 2016 12:12 PM
To: Kumar, Saurabh 5. (Nokia - IN/Bangalore) 
Cc: user@spark.apache.org; Sawhney, Prerna (Nokia - IN/Bangalore) 

Subject: Re: Query related to spark cluster


Well if you require R then you need to install it (including all additional 
packages) on each node. I am not sure why you store the data in Postgres . 
Storing it in Parquet and Orc is sufficient in HDFS (sorted on relevant 
columns) and you use the SparkR libraries to access them.

On 30 May 2016, at 08:38, Kumar, Saurabh 5. (Nokia - IN/Bangalore) 
> wrote:
Hi Team,

I am using Apache spark to build scalable Analytic engine. My setup is as 
follows.

Flow of processing is as follows:

Raw Files > Store to HDFS > Process by Spark and Store to Postgre_XL Database > 
R process data fom Postgre-XL to process in distributed mode.

I have 6 nodes cluster setup for ETL operations which have

1.  Spark slaves installed on all 6 of them.
2.  HDFS data nodes on each of 6 nodes with replication factor 2.
3.  PosGRE –XL 9.5 Database coordinator on each of 6 nodes.
4.  R software is installed on all nodes and Uses process Data from 
Postgre-XL in distributed manner.




Can you please guide me about pros and cons of this setup.
Installing all component on every machines is recommended or there is any 
drawback?
R software should run on spark cluster ?



Thanks & Regards
Saurabh Kumar
R Engineer, T TED Technology Explorat
Nokia Networks
L5, Manyata Embassy Business Park, Nagavara, Bangalore, India 560045
Mobile: +91-8861012418
http://networks.nokia.com/




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Query related to spark cluster

2016-05-30 Thread Jörn Franke

Well if you require R then you need to install it (including all additional 
packages) on each node. I am not sure why you store the data in Postgres . 
Storing it in Parquet and Orc is sufficient in HDFS (sorted on relevant 
columns) and you use the SparkR libraries to access them.

> On 30 May 2016, at 08:38, Kumar, Saurabh 5. (Nokia - IN/Bangalore) 
>  wrote:
> 
> Hi Team,
>  
> I am using Apache spark to build scalable Analytic engine. My setup is as 
> follows.
>  
> Flow of processing is as follows:
>  
> Raw Files > Store to HDFS > Process by Spark and Store to Postgre_XL Database 
> > R process data fom Postgre-XL to process in distributed mode.
>  
> I have 6 nodes cluster setup for ETL operations which have
>  
> Spark slaves installed on all 6 of them.
> HDFS data nodes on each of 6 nodes with replication factor 2.
> PosGRE –XL 9.5 Database coordinator on each of 6 nodes.
> R software is installed on all nodes and Uses process Data from Postgre-XL in 
> distributed manner.
>  
>  
>  
>  
> Can you please guide me about pros and cons of this setup.
> Installing all component on every machines is recommended or there is any 
> drawback?
> R software should run on spark cluster ?
>  
>  
>  
> Thanks & Regards
> Saurabh Kumar
> R Engineer, T TED Technology Explorat
> Nokia Networks
> L5, Manyata Embassy Business Park, Nagavara, Bangalore, India 560045
> Mobile: +91-8861012418
> http://networks.nokia.com/
>  
>  
>  


Query related to spark cluster

2016-05-30 Thread Kumar, Saurabh 5. (Nokia - IN/Bangalore)
Hi Team,

I am using Apache spark to build scalable Analytic engine. My setup is as 
follows.

Flow of processing is as follows:

Raw Files > Store to HDFS > Process by Spark and Store to Postgre_XL Database > 
R process data fom Postgre-XL to process in distributed mode.

I have 6 nodes cluster setup for ETL operations which have

1.  Spark slaves installed on all 6 of them.
2.  HDFS data nodes on each of 6 nodes with replication factor 2.
3.  PosGRE -XL 9.5 Database coordinator on each of 6 nodes.
4.  R software is installed on all nodes and Uses process Data from 
Postgre-XL in distributed manner.




  Can you please guide me about pros and cons of this setup.
  Installing all component on every machines is recommended or there is any 
drawback?
  R software should run on spark cluster ?



Thanks & Regards
Saurabh Kumar
R Engineer, T TED Technology Explorat
Nokia Networks
L5, Manyata Embassy Business Park, Nagavara, Bangalore, India 560045
Mobile: +91-8861012418
http://networks.nokia.com/





Re: Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-30 Thread sjk
org.apache.hadoop.hbase.client.{Mutation, Put}
org.apache.hadoop.hbase.io.ImmutableBytesWritable

if u used mutation, register the above class too

> On May 30, 2016, at 08:11, Nirav Patel  wrote:
> 
> Sure let me can try that. But from looks of it it seems kryo 
> kryo.util.MapReferenceResolver.getReadObject trying to access incorrect index 
> (100) 
> 
> On Sun, May 29, 2016 at 5:06 PM, Ted Yu  > wrote:
> Can you register Put with Kryo ?
> 
> Thanks
> 
> On May 29, 2016, at 4:58 PM, Nirav Patel  > wrote:
> 
>> I pasted code snipped for that method.
>> 
>> here's full def:
>> 
>>   def writeRddToHBase2(hbaseRdd: RDD[(ImmutableBytesWritable, Put)], 
>> tableName: String) {
>> 
>> 
>> 
>> hbaseRdd.values.foreachPartition{ itr =>
>> 
>> val hConf = HBaseConfiguration.create()
>> 
>> hConf.setInt("hbase.client.write.buffer", 16097152)
>> 
>> val table = new HTable(hConf, tableName)
>> 
>> //table.setWriteBufferSize(8388608)
>> 
>> itr.grouped(100).foreach(table.put(_))   // << Exception happens at 
>> this point
>> 
>> table.close()
>> 
>> }
>> 
>>   }
>> 
>> 
>> 
>> I am using hbase 0.98.12 mapr distribution.
>> 
>> 
>> 
>> Thanks
>> 
>> Nirav
>> 
>> 
>> On Sun, May 29, 2016 at 4:46 PM, Ted Yu > > wrote:
>> bq.  at 
>> com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
>> 
>> Can you reveal related code from HbaseUtils.scala ?
>> 
>> Which hbase version are you using ?
>> 
>> Thanks
>> 
>> On Sun, May 29, 2016 at 4:26 PM, Nirav Patel > > wrote:
>> Hi,
>> 
>> I am getting following Kryo deserialization error when trying to buklload 
>> Cached RDD into Hbase. It works if I don't cache the RDD. I cache it with 
>> MEMORY_ONLY_SER.
>> 
>> here's the code snippet:
>> 
>> 
>> hbaseRdd.values.foreachPartition{ itr =>
>> val hConf = HBaseConfiguration.create()
>> hConf.setInt("hbase.client.write.buffer", 16097152)
>> val table = new HTable(hConf, tableName)
>> itr.grouped(100).foreach(table.put(_))
>> table.close()
>> }
>> hbaseRdd is of type RDD[(ImmutableBytesWritable, Put)]
>> 
>> 
>> Exception I am getting. I read on Kryo JIRA that this may be issue with 
>> incorrect use of serialization library. So could this be issue with 
>> twitter-chill library or spark core it self ? 
>> 
>> Job aborted due to stage failure: Task 16 in stage 9.0 failed 10 times, most 
>> recent failure: Lost task 16.9 in stage 9.0 (TID 28614, 
>> hdn10.mycorptcorporation.local): com.esotericsoftware.kryo.KryoException: 
>> java.lang.IndexOutOfBoundsException: Index: 100, Size: 6
>> Serialization trace:
>> familyMap (org.apache.hadoop.hbase.client.Put)
>>  at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>  at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>  at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>  at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>  at 
>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)
>>  at 
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
>>  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>>  at 
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
>>  at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>  at 
>> com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
>>  at 
>> com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:75)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
>>  at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
>>  at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
>>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>  at