[REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-03 Thread Adrian Tanase
Hi all,

Trying to repost this question from a colleague on my team, somehow his 
subscription is not active:
http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html

Appreciate any thoughts,
-adrian


Re: thought experiment: use spark ML to real time prediction

2015-11-11 Thread Adrian Tanase
I don’t think this answers your question but here’s how you would evaluate the 
model in realtime in a streaming app
https://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/predict.html

Maybe you can find a way to extract portions of MLLib and run them outside of 
spark – loading the precomputed model and calling .predict on it…

-adrian

From: Andy Davidson
Date: Tuesday, November 10, 2015 at 11:31 PM
To: "user @spark"
Subject: thought experiment: use spark ML to real time prediction

Lets say I have use spark ML to train a linear model. I know I can save and 
load the model to disk. I am not sure how I can use the model in a real time 
environment. For example I do not think I can return a “prediction” to the 
client using spark streaming easily. Also for some applications the extra 
latency created by the batch process might not be acceptable.

If I was not using spark I would re-implement the model I trained in my batch 
environment in a lang like Java  and implement a rest service that uses the 
model to create a prediction and return the prediction to the client. Many 
models make predictions using linear algebra. Implementing predictions is 
relatively easy if you have a good vectorized LA package. Is there a way to use 
a model I trained using spark ML outside of spark?

As a motivating example, even if its possible to return data to the client 
using spark streaming. I think the mini batch latency would not be acceptable 
for a high frequency stock trading system.

Kind regards

Andy

P.s. The examples I have seen so far use spark streaming to “preprocess” 
predictions. For example a recommender system might use what current users are 
watching to calculate “trending recommendations”. These are stored on disk and 
served up to users when the use the “movie guide”. If a recommendation was a 
couple of min. old it would not effect the end users experience.



Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-10 Thread Adrian Tanase
Can you be a bit more specific about what “blow up” means? Also what do you 
mean by “messed up” brokers? Inbalance? Broker(s) dead?

We’re also using the direct consumer and so far nothing dramatic happened:
- on READ it automatically reads from backups if leader is dead (machine gone)
- or READ if there is a huge imbalance (partitions/leaders) the job might slow 
down if you don’t have enough cores on the machine with many partitions
- on WRITE - we’ve seen a weird delay of ~7 seconds that I don’t know how to 
re-configure, there’s a timeout that delays the job but it eventually writes 
data to a replica
- it only died when there are no more brokers left and there are partitions 
without a leader. This happened when almost half the cluster was dead during a 
reliability test

Regardless, I would look at the source and try to monitor the kafka cluster for 
things like partitions without leaders or big inbalances.

Hope this helps,
-adrian





On 11/9/15, 8:26 PM, "swetha"  wrote:

>Hi,
>
>How to recover Kafka Direct automatically when the there is a problem with
>Kafka brokers? Sometimes our Kafka Brokers gets messed up and the entire
>Streaming job blows up unlike some other consumers which do recover
>automatically. How can I make sure that Kafka Direct recovers automatically
>when the broker fails for sometime say 30 minutes? What kind of monitors
>should be in place to recover the job?
>
>Thanks,
>Swetha 
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Direct-does-not-recover-automatically-when-the-Kafka-Stream-gets-messed-up-tp25331.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-10 Thread Adrian Tanase
I’ve seen this before during an extreme outage on the cluster, where the kafka 
offsets checkpointed by the directstreamRdd were bigger than what kafka 
reported. The checkpoint was therefore corrupted.
I don’t know the root cause but since I was stressing the cluster during a 
reliability test I can only assume that one of the Kafka partitions was 
restored from an out-of-sync replica and did not contain all the data. Seems 
extreme but I don’t have another idea.

@Cody – do you know of a way to recover from a situation like this? Can someone 
manually delete folders from the checkpoint folder to help the job recover? 
E.g. Go 2 steps back, hoping that kafka has those offsets.

-adrian

From: swetha kasireddy
Date: Monday, November 9, 2015 at 10:40 PM
To: Cody Koeninger
Cc: "user@spark.apache.org"
Subject: Re: Kafka Direct does not recover automatically when the Kafka Stream 
gets messed up?

OK. But, one thing that I observed is that when there is a problem with Kafka 
Stream, unless I delete the checkpoint directory the Streaming job does not 
restart. I guess it tries to retry the failed tasks and if it's not able to 
recover, it fails again. Sometimes, it fails with StackOverFlow Error.

Why does the Streaming job not restart from checkpoint directory when the job 
failed earlier with Kafka Brokers getting messed up? We have the checkpoint 
directory in our hdfs.

On Mon, Nov 9, 2015 at 12:34 PM, Cody Koeninger 
> wrote:
I don't think deleting the checkpoint directory is a good way to restart the 
streaming job, you should stop the spark context or at the very least kill the 
driver process, then restart.

On Mon, Nov 9, 2015 at 2:03 PM, swetha kasireddy 
> wrote:
Hi Cody,

Our job is our failsafe as we don't have Control over Kafka Stream as of now. 
Can setting rebalance max retries help? We do not have any monitors setup as of 
now. We need to setup the monitors.

My idea is to to have some kind of Cron job that queries the Streaming API for 
monitoring like every 5 minutes and then send an email alert and automatically 
restart the Streaming job by deleting the Checkpoint directory. Would that help?



Thanks!

On Mon, Nov 9, 2015 at 11:09 AM, Cody Koeninger 
> wrote:
The direct stream will fail the task if there is a problem with the kafka 
broker.  Spark will retry failed tasks automatically, which should handle 
broker rebalances that happen in a timely fashion. spark.tax.maxFailures 
controls the maximum number of retries before failing the job.  Direct stream 
isn't any different from any other spark task in that regard.

The question of what kind of monitoring you need is more a question for your 
particular infrastructure and what you're already using for monitoring.  We put 
all metrics (application level or system level) into graphite and alert from 
there.

I will say that if you've regularly got problems with kafka falling over for 
half an hour, I'd look at fixing that before worrying about spark monitoring...


On Mon, Nov 9, 2015 at 12:26 PM, swetha 
> wrote:
Hi,

How to recover Kafka Direct automatically when the there is a problem with
Kafka brokers? Sometimes our Kafka Brokers gets messed up and the entire
Streaming job blows up unlike some other consumers which do recover
automatically. How can I make sure that Kafka Direct recovers automatically
when the broker fails for sometime say 30 minutes? What kind of monitors
should be in place to recover the job?

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Direct-does-not-recover-automatically-when-the-Kafka-Stream-gets-messed-up-tp25331.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org







Re: Dynamic Allocation & Spark Streaming

2015-11-06 Thread Adrian Tanase
You can register a streaming listener – in the BatchInfo you’ll find a lot of 
stats (including count of received records) that you can base your logic on:
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.scheduler.BatchInfo

From: Kyle Lin
Date: Friday, November 6, 2015 at 11:48 AM
To: Tathagata Das
Cc: robert towne, user
Subject: Re: Dynamic Allocation & Spark Streaming

Hey there

I run Spark streaming 1.5.1 on YARN with Dynamic allocation, and use direct 
stream API to read data from Kafka.

Spark job can dynamically request a executor when reaching 
spark.dynamicAllocation.schedulerBacklogTimeout.

However, it won't dynamically remove executor when there is no more data from 
Kafka, because executors won't be idle but continually get empty RDD.

Is it possible to find that there are more than N continuing empty RDDs and 
remove executors manually? How could I remember how many empty RDD I get and 
remove executors?

Kyle


2015-10-20 4:48 GMT+08:00 Tathagata Das 
>:
Unfortunately the title on the JIRA is extremely confusing. I have fixed it.

The reason why dynamic allocation does not work well with streaming is that the 
heuristic that is used to automatically scale up or down the number of 
executors works for the pattern of task schedules in batch jobs, not for 
streaming jobs. We would definitely solve this in future, may be 1.7.0 or later.
In the mean time, there are developer API function that allows you add and 
remove executors explicitly. See sparkContext.requestExecutors() and 
sparkContext.killExecutors(). With this you can write your own scaling logic. 
In your case I would do the following.
1. Ask for a large number of executors / cores through spark-submit.
2. Use a StreamingListener to monitor whether it has caught up.
3. Then call killExecutors, to slowly kill a few of them, but make sure using 
the listener that the scheduling delay does not go up.

Hope this helps. Let me know if this works for you.

On Mon, Oct 19, 2015 at 1:13 PM, robert towne 
> wrote:
I have watched a few videos from Databricks/Andrew Or around the Spark 1.2 
release and it seemed that dynamic allocation was not yet available for Spark 
Streaming.

I now see SPARK-10955 which 
is tied to 1.5.2 and allows disabling of Spark Streaming with dynamic 
allocation.

I use Spark Streaming with a receiverless/direct Kafka connection.  When I 
start up an app reading from the beginning of the topic I would like to have 
more resources than once I have caught up.  Is it possible to use dynamic 
allocation for this use case?

thanks,
Robert




Re: How to unpersist a DStream in Spark Streaming

2015-11-06 Thread Adrian Tanase
Do we have any guarantees on the maximum duration?

I've seen RDDs kept around for 7-10 minutes on batches of 20 secs and 
checkpoint of 100 secs. No windows, just updateStateByKey.

 t's not a memory issue but on checkpoint recovery it goes back to Kafka for 10 
minutes of data, any idea why?

-adrian

Sent from my iPhone

On 06 Nov 2015, at 09:45, Tathagata Das 
> wrote:

Spark streaming automatically takes care of unpersisting any RDDs generated by 
DStream. You can set the StreamingContext.remember() to set the minimum 
persistence duration. Any persisted RDD older than that will be automatically 
unpersisted

On Thu, Nov 5, 2015 at 9:12 AM, swetha kasireddy 
> wrote:
Its just in the same thread for a particular RDD, I need to uncache it every 2 
minutes to clear out the data that is present in a Map inside that.

On Wed, Nov 4, 2015 at 11:54 PM, Saisai Shao 
> wrote:
Hi Swetha,

Would you mind elaborating your usage scenario of DStream unpersisting?

>From my understanding:

1. Spark Streaming will automatically unpersist outdated data (you already 
mentioned about the configurations).
2. If streaming job is started, I think you may lose the control of the job, 
when do you call this unpersist, how to call this unpersist (from another 
thread)?

Thanks
Saisai


On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy 
> wrote:
Other than setting the following.


sparkConf.set("spark.streaming.unpersist", "true")
sparkConf.set("spark.cleaner.ttl", "7200s")

On Wed, Nov 4, 2015 at 5:03 PM, swetha 
> wrote:
Hi,

How to unpersist a DStream in Spark Streaming? I know that we can persist
using dStream.persist() or dStream.cache. But, I don't see any method to
unPersist.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org







Re: Scheduling Spark process

2015-11-05 Thread Adrian Tanase
You should also specify how you’re planning to query or “publish” the data. I 
would consider a combination of:
- spark streaming job that ingests the raw events in real time, validates, 
pre-process and saves to stable storage
  - stable storage could be HDFS/parquet or a database optimized for time 
series (hbase, cassandra, etc)
- regular spark job that you trigger via cron every day/week/month OR
- query the DB directly depending on how much data it has or if it supports 
secondary indexes that build up partial aggregations (hourly/daily) that are 
easy to compute at query time

Your example of average is easy to do live on a DB if it has secondary indexes 
as the operation is associative and it can be gradually rolled up at 
hourly/daily/monthly level.
For “count distinct” or unique metrics it’s tougher as you’ll need access to 
the raw data (unless you’re willing to accept ~99% accuracy, when you can use 
HLL aggregators).

Hope this helps,
-adrian



On 11/5/15, 10:48 AM, "danilo"  wrote:

>Hi All,
>
>I'm quite new about this topic and about Spark in general. 
>
>I have a sensor that is pushing data in real time and I need to calculate
>some KPIs based on the data I have received. Given that some of the KPIs are
>related to very old data (e.g. average of number of event in the last 3
>months) I was wondering what is the best approach to do this with SPARK. 
>
>The approach I'm currently following is creating partial KPIs in real time
>and then create the other KPIs with a second spark chain scheduled on daily
>/ weekly / monthly basis.
>
>does make sense? if so, how can I schedule spark to run only once in a day /
>week / month?
>
>Thx
>D
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Scheduling-Spark-process-tp25287.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use data from Database and reload every hour

2015-11-05 Thread Adrian Tanase
You should look at .transform – it’s a powerful transformation (sic) that 
allows you to dynamically load resources and it gets executed in every micro 
batch.

Re-broadcasting something should be possible from inside transform as that code 
is executed on the driver but it’s still a controversial topic, as you probably 
need to create a NEW broadcast variable instead of updating the existing one.
http://search-hadoop.com/?q=transform+update+broadcast=2_project=Spark

An alternative is to load the filters from mysql and apply them implicitly 
inside the .transform via rdd.filter instead of broadcast them to the 
executors. See this thread:
http://search-hadoop.com/m/q3RTt2UD6KyBO5M1=Re+Streaming+checkpoints+and+logic+change

Hope this helps,
-adrian

From: Kay-Uwe Moosheimer
Date: Thursday, November 5, 2015 at 3:33 PM
To: "user@spark.apache.org"
Subject: How to use data from Database and reload every hour

I have the following problem.
We have MySQL and an Spark cluster.
We need to load 5 different validation-instructions (several thousand of 
entries each) and use this information on the executors to decide if content 
from Kafka-Streaming is for process a or b.
The streaming data from kafka are json messages and the validation-info from 
MySQL says „if field a is that and field b ist that then process a“ and so on.

The tables on MySQL are changing over time and we have to reload the data every 
hour.
I tried to use broadcasting where I load the data and store it on HashSets and 
HashMaps (java code), but It’s not possible to redistribute the data.

What would be the best way to resolve my problem?
Se native jdbc in executor task an load the data – can the executor store this 
data on HashSets etc. for next call so that I only load the data every hour?
Use other possibilities?



Re: Why some executors are lazy?

2015-11-04 Thread Adrian Tanase
Your solution is an easy, pragmatic one, however there are many factors 
involved – and it’s not guaranteed to work for other data sets.

It depends on:

  *   The distribution of data on the keys. Random session ids will naturally 
distribute better than “customer names” - you can mitigate this with custom 
partitioners if the downstream algorithm does not care
 *   E.g. See the optimizations used in the page rank algorithm that group 
pages from the same domain together on the premise that most links go to pages 
from the same domain
 *   This is ultimately a hard problem to solve generically …
  *   The number of partitions – the rule of thumb here is to use 2-3 times the 
number of cores, on the following premises:
 *   More granular task will use the cores more efficiently from a 
scheduling perspective (less idle spots)
 *   Also, they will work with less data and avoid OOM problems when the 
data for a task is larger than executor working memory
 *   Through experimentation you will find the sweet spot, when you get 
into hundreds of partitions you may incur scheduling overhead…
  *   Data locality and shuffling + spark locality wait settings
 *   The scheduler might decide to take advantage of free cores in the 
cluster and schedule an off-node processing and you can control how long it 
waits through the spark.locality.wait settings

Hope this helps,
-adrian

From: Khaled Ammar
Date: Wednesday, November 4, 2015 at 4:03 PM
To: Adrian Tanase
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Why some executors are lazy?

Thank you Adrian,

The dataset is indeed skewed. My concern was that some executors do not 
participate in computation at all. I understand that executors finish tasks 
sequentially. Therefore, using more executors allow for better parallelism.

I managed to force all executors to participate by increasing number of 
partitions. My guess is, the scheduler preferred to reduce number of machines 
participating in the computation to decrease network overhead.

Do you think my analysis is correct? How should one decide on number of 
partitions? Does it depend on the workload or dataset or both ?

Thanks,
-Khaled


On Wed, Nov 4, 2015 at 7:21 AM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
If some of the operations required involve shuffling and partitioning, it might 
mean that the data set is skewed to specific partitions which will create hot 
spotting on certain executors.

-adrian

From: Khaled Ammar
Date: Tuesday, November 3, 2015 at 11:43 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Why some executors are lazy?

Hi,

I'm using the most recent Spark version on a standalone setup of 16+1 machines.

While running GraphX workloads, I found that some executors are lazy? They 
*rarely* participate in computation. This causes some other executors to do 
their work. This behavior is consistent in all iterations and even in the data 
loading step. Only two specific executors do not participate in most 
computations.


Does any one know how to fix that?


More details:
Each machine has 4 cores. I set number of partitions to be 3*16. Each executor 
was supposed to do 3 tasks, but few of them end up working on 4 task instead, 
which causes delay in computation.



--
Thanks,
-Khaled



--
Thanks,
-Khaled


Re: Spark Streaming data checkpoint performance

2015-11-04 Thread Adrian Tanase
Nice! Thanks for sharing, I wasn’t aware of the new API.

Left some comments on the JIRA and design doc.

-adrian

From: Shixiong Zhu
Date: Tuesday, November 3, 2015 at 3:32 AM
To: Thúy Hằng Lê
Cc: Adrian Tanase, "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Spark Streaming data checkpoint performance

"trackStateByKey" is about to be added in 1.6 to resolve the performance issue 
of "updateStateByKey". You can take a look at 
https://issues.apache.org/jira/browse/SPARK-2629 and 
https://github.com/apache/spark/pull/9256


Re: Why some executors are lazy?

2015-11-04 Thread Adrian Tanase
If some of the operations required involve shuffling and partitioning, it might 
mean that the data set is skewed to specific partitions which will create hot 
spotting on certain executors.

-adrian

From: Khaled Ammar
Date: Tuesday, November 3, 2015 at 11:43 PM
To: "user@spark.apache.org"
Subject: Why some executors are lazy?

Hi,

I'm using the most recent Spark version on a standalone setup of 16+1 machines.

While running GraphX workloads, I found that some executors are lazy? They 
*rarely* participate in computation. This causes some other executors to do 
their work. This behavior is consistent in all iterations and even in the data 
loading step. Only two specific executors do not participate in most 
computations.


Does any one know how to fix that?


More details:
Each machine has 4 cores. I set number of partitions to be 3*16. Each executor 
was supposed to do 3 tasks, but few of them end up working on 4 task instead, 
which causes delay in computation.



--
Thanks,
-Khaled


FW: Spark streaming - failed recovery from checkpoint

2015-11-02 Thread Adrian Tanase
Re-posting here, didn’t get any feedback on the dev list.

Has anyone experienced corrupted checkpoints recently?

Thanks!
-adrian

From: Adrian Tanase
Date: Thursday, October 29, 2015 at 1:38 PM
To: "d...@spark.apache.org<mailto:d...@spark.apache.org>"
Subject: Spark streaming - failed recovery from checkpoint

Hi guys,

I’ve encountered some problems with a crashed Spark Streaming job, when 
restoring from checkpoint.
I’m runnning spark 1.5.1 on Yarn (hadoop 2.6) in cluster mode, reading from 
Kafka with the direct consumer and a few updateStateByKey stateful 
transformations.

After investigating, I think the following happened:

  *   Active ResourceManager crashed (aws machine crashed)
  *   10 minutes later — default Yarn settings :( — Standby took over and 
redeployed the job, sending a SIGTERM to the running driver
  *   Recovery from checkpoint failed because of missing RDD in checkpoint 
folder

One complication - UNCONFIRMED because of missing logs – I believe that the new 
driver was started ~5 minutes before the old one stopped.

With your help, I’m trying to zero in on a root cause or a combination of:

  *   bad Yarn/Spark configuration (10 minutes to react to missing node, 
already fixed through more aggressive liveliness settings)
  *   YARN fact of life – why is running job redeployed when standby RM takes 
over?
  *   Bug/race condition in spark checkpoint cleanup/recovery? (why is RDD 
cleaned up by the old app and then recovery fails when it looks for it?)
  *   Bugs in the Yarn-Spark integration (missing heartbeats? Why is the new 
app started 5 minutes before the old one dies?)
  *   Application code – should we add graceful shutdown? Should I add a 
Zookeeper lock that prevents 2 instances of the driver starting at the same 
time?

Sorry if the questions are a little all over the place, getting to the root 
cause of this was a pain and I can’t even log an issue in Jira without your 
help.

Attaching some logs that showcase the checkpoint recovery failure (I’ve grepped 
for “checkpoint” to highlight the core issue):

  *   Driver logs prior to shutdown: http://pastebin.com/eKqw27nT
  *   Driver logs, failed recovery: http://pastebin.com/pqACKK7W
  *
Other info:
 *   spark.streaming.unpersist = true
 *   spark.cleaner.ttl = 259200 (3 days)

Last question – in the checkpoint recovery process I notice that it’s going 
back ~6 minutes on the persisted RDDs and ~10 minutes to replay from kafka.
I’m running with 20 second batches and 100 seconds checkpoint interval (small 
issue - one of the RDDs was using the default interval of 20 secs). Shouldn’t 
the lineage be a lot smaller?
Based on the documentation I would have expected that the recovery goes back at 
most 100 seconds, as I’m not doing any windowed operations…

Thanks in advance!
-adrian


Re: execute native system commands in Spark

2015-11-02 Thread Adrian Tanase
Have you seen .pipe()?




On 11/2/15, 5:36 PM, "patcharee"  wrote:

>Hi,
>
>Is it possible to execute native system commands (in parallel) Spark, 
>like scala.sys.process ?
>
>Best,
>Patcharee
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Adrian Tanase
You are correct, the default checkpointing interval is 10 seconds or your batch 
size, whichever is bigger. You can change it by calling .checkpoint(x) on your 
resulting Dstream.

For the rest, you are probably keeping an “all time” word count that grows 
unbounded if you never remove words from the map. Keep in mind that 
updateStateByKey is called for every key in the state RDD, regardless if you 
have new occurrences or not.

You should consider at least one of these strategies:

  *   run your word count on a windowed Dstream (e.g. Unique counts over the 
last 15 minutes)
 *   Your best bet here is reduceByKeyAndWindow with an inverse function
  *   Make your state object more complicated and try to prune out words with 
very few occurrences or that haven’t been updated for a long time
 *   You can do this by emitting None from updateStateByKey

Hope this helps,
-adrian

From: Thúy Hằng Lê
Date: Monday, November 2, 2015 at 7:20 AM
To: "user@spark.apache.org"
Subject: Spark Streaming data checkpoint performance

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));


Re: Pivot Data in Spark and Scala

2015-10-30 Thread Adrian Tanase
Its actually a bit tougher as you’ll first need all the years. Also not sure 
how you would reprsent your “columns” given they are dynamic based on the input 
data.

Depending on your downstream processing, I’d probably try to emulate it with a 
hash map with years as keys instead of the columns.

There is probably a nicer solution using the data frames API but I’m not 
familiar with it.

If you actually need vectors I think this article I saw recently on the data 
bricks blog will highlight some options (look for gather encoder)
https://databricks.com/blog/2015/10/20/audience-modeling-with-spark-ml-pipelines.html

-adrian

From: Deng Ching-Mallete
Date: Friday, October 30, 2015 at 4:35 AM
To: Ascot Moss
Cc: User
Subject: Re: Pivot Data in Spark and Scala

Hi,

You could transform it into a pair RDD then use the combineByKey function.

HTH,
Deng

On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss 
> wrote:
Hi,

I have data as follows:

A, 2015, 4
A, 2014, 12
A, 2013, 1
B, 2015, 24
B, 2013 4


I need to convert the data to a new format:
A ,4,12,1
B,   24,,4

Any idea how to make it in Spark Scala?

Thanks




Re: Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Adrian Tanase
Does it need to be a mock? Can you use sc.parallelize(data)?

From: Priya Ch
Date: Thursday, October 29, 2015 at 2:00 PM
To: Василец Дмитрий
Cc: "user@spark.apache.org", 
"spark-connector-u...@lists.datastax.com"
Subject: Re: Mock Cassandra DB Connection in Unit Testing

One more question, if i have a function which takes RDD as a parameter, how do 
we mock an RDD ??

On Thu, Oct 29, 2015 at 5:20 PM, Priya Ch 
> wrote:
How do we do it for Cassandra..can we use the same Mocking ? EmbeddedCassandra 
Server is available with CassandraUnit. Can this be used in Spark Code as well 
? I mean with Scala code ?

On Thu, Oct 29, 2015 at 5:03 PM, Василец Дмитрий 
> wrote:
there is example how i mock mysql
import org.scalamock.scalatest.MockFactory
 val connectionMock = mock[java.sql.Connection]
 val statementMock = mock[PreparedStatement]
(conMock.prepareStatement(_: 
String)).expects(sql.toString).returning(statementMock)
(statementMock.executeUpdate _).expects()


On Thu, Oct 29, 2015 at 12:27 PM, Priya Ch 
> wrote:
Hi All,

  For my  Spark Streaming code, which writes the results to Cassandra DB, I 
need to write Unit test cases. what are the available test frameworks to mock 
the connection to Cassandra DB ?





Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread Adrian Tanase
You can decouple the batch interval and the window sizes. If during processing 
you’re aggregating data and your operations benefit of an inverse function, 
then you can optimally process windows of data.

E.g. You could set a global batch interval of 10 seconds. You can process the 
incoming data from Kafka, aggregating the input.
Then you can create a window of 3 minutes (both length and slide) over the 
partial results. In this case the inverse function is not helpful as all the 
data is new in every window.

You can even coalesce the final Dstream to avoid writing many small files. For 
example you could be writing LESS files MORE OFTEN and achieve a similar effect.

All of this is of course hypothetical since I don’t know what processing you 
are applying to the data coming from Kafka. More like food for thought.

-adrian





On 10/29/15, 2:50 PM, "Afshartous, Nick" <nafshart...@turbine.com> wrote:

>< Does it work as expected with smaller batch or smaller load? Could it be 
>that it's accumulating too many events over 3 minutes?
>
>Thanks for you input.  The 3 minute window was chosen because we write the 
>output of each batch into S3.  And with smaller batch time intervals there 
>were many small files being written to S3, something to avoid.  That was the 
>explanation of the developer who made this decision (who's no longer on the 
>team).   We're in the process of re-evaluating.
>--
> Nick
>
>-Original Message-
>From: Adrian Tanase [mailto:atan...@adobe.com]
>Sent: Wednesday, October 28, 2015 4:53 PM
>To: Afshartous, Nick <nafshart...@turbine.com>
>Cc: user@spark.apache.org
>Subject: Re: Spark/Kafka Streaming Job Gets Stuck
>
>Does it work as expected with smaller batch or smaller load? Could it be that 
>it's accumulating too many events over 3 minutes?
>
>You could also try increasing the parallelism via repartition to ensure 
>smaller tasks that can safely fit in working memory.
>
>Sent from my iPhone
>
>> On 28 Oct 2015, at 17:45, Afshartous, Nick <nafshart...@turbine.com> wrote:
>>
>>
>> Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)  job 
>> and seeing a problem.  This is running in AWS/Yarn and the streaming batch 
>> interval is set to 3 minutes and this is a ten node cluster.
>>
>> Testing at 30,000 events per second we are seeing the streaming job get 
>> stuck (stack trace below) for over an hour.
>>
>> Thanks on any insights or suggestions.
>> --
>>  Nick
>>
>> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartiti
>> onsToPair(JavaDStreamLike.scala:43)
>> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> erDriver.runStream(StreamingKafkaConsumerDriver.java:125)
>> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> erDriver.main(StreamingKafkaConsumerDriver.java:71)
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
>> ava:57)
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
>> orImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:606)
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(Application
>> Master.scala:480)
>>
>> Notice: This communication is for the intended recipient(s) only and may 
>> contain confidential, proprietary, legally protected or privileged 
>> information of Turbine, Inc. If you are not the intended recipient(s), 
>> please notify the sender at once and delete this communication. Unauthorized 
>> use of the information in this communication is strictly prohibited and may 
>> be unlawful. For those recipients under contract with Turbine, Inc., the 
>> information in this communication is subject to the terms and conditions of 
>> any applicable contracts or agreements.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>> additional commands, e-mail: user-h...@spark.apache.org
>>
>
>Notice: This communication is for the intended recipient(s) only and may 
>contain confidential, proprietary, legally protected or privileged information 
>of Turbine, Inc. If you are not the intended recipient(s), please notify the 
>sender at once and delete this communication. Unauthorized use of the 
>information in this communication is strictly prohibited and may be unlawful. 
>For those recipients under contract with Turbine, Inc., the information in 
>this communication is subject to the terms and conditions of any applicable 
>contracts or agreements.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need more tasks in KafkaDirectStream

2015-10-29 Thread Adrian Tanase
You can call .repartition on the Dstream created by the Kafka direct consumer. 
You take the one-time hit of a shuffle but gain the ability to scale out 
processing beyond your number of partitions.

We’re doing this to scale up from 36 partitions / topic to 140 partitions (20 
cores * 7 nodes) and it works great.

-adrian

From: varun sharma
Date: Thursday, October 29, 2015 at 8:27 AM
To: user
Subject: Need more tasks in KafkaDirectStream

Right now, there is one to one correspondence between kafka partitions and 
spark partitions.
I dont have a requirement of one to one semantics.
I need more tasks to be generated in the job so that it can be parallelised and 
batch can be completed fast. In the previous Receiver based approach number of 
tasks created were independent of kafka partitions, I need something like that 
only.
Any config available if I dont need one to one semantics?
Is there any way I can repartition without incurring any additional cost.

Thanks
VARUN SHARMA



Re: How do I parallize Spark Jobs at Executor Level.

2015-10-28 Thread Adrian Tanase
The first line is distributing your fileList variable in the cluster as a RDD, 
partitioned using the default partitioner settings (e.g. Number of cores in 
your cluster).

Each of your workers would one or more slices of data (depending on how many 
cores each executor has) and the abstraction is called partition.

What is your use case? If you want to load the files and continue processing in 
parallel, then a simple .map should work.
If you want to execute arbitrary code based on the list of files that each 
executor received, then you need to use .foreach that will get executed for 
each of the entries, on the worker.

-adrian

From: Vinoth Sankar
Date: Wednesday, October 28, 2015 at 2:49 PM
To: "user@spark.apache.org"
Subject: How do I parallize Spark Jobs at Executor Level.

Hi,

I'm reading and filtering large no of files using Spark. It's getting 
parallized at Spark Driver level only. How do i make it parallelize to 
Executor(Worker) Level. Refer the following sample. Is there any way to 
paralleling iterate the localIterator ?

Note : I use Java 1.7 version

JavaRDD files = javaSparkContext.parallelize(fileList)
Iterator localIterator = files.toLocalIterator();

Regards
Vinoth Sankar


Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-28 Thread Adrian Tanase
Does it work as expected with smaller batch or smaller load? Could it be that 
it's accumulating too many events over 3 minutes?

You could also try increasing the parallelism via repartition to ensure smaller 
tasks that can safely fit in working memory.

Sent from my iPhone

> On 28 Oct 2015, at 17:45, Afshartous, Nick  wrote:
> 
> 
> Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)  job and 
> seeing a problem.  This is running in AWS/Yarn and the streaming batch 
> interval is set to 3 minutes and this is a ten node cluster.
> 
> Testing at 30,000 events per second we are seeing the streaming job get stuck 
> (stack trace below) for over an hour.
> 
> Thanks on any insights or suggestions.
> --
>  Nick
> 
> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43)
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125)
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)
> 
> Notice: This communication is for the intended recipient(s) only and may 
> contain confidential, proprietary, legally protected or privileged 
> information of Turbine, Inc. If you are not the intended recipient(s), please 
> notify the sender at once and delete this communication. Unauthorized use of 
> the information in this communication is strictly prohibited and may be 
> unlawful. For those recipients under contract with Turbine, Inc., the 
> information in this communication is subject to the terms and conditions of 
> any applicable contracts or agreements.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: There is any way to write from spark to HBase CDH4?

2015-10-27 Thread Adrian Tanase
Also I just remembered about cloudera’s contribution
http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/

From: Deng Ching-Mallete
Date: Tuesday, October 27, 2015 at 12:03 PM
To: avivb
Cc: user
Subject: Re: There is any way to write from spark to HBase CDH4?

Hi,

We are using phoenix-spark (http://phoenix.apache.org/phoenix_spark.html) to 
write data to HBase, but it requires spark 1.3.1+ and phoenix 4.4+. Previously, 
when we were still on spark 1.2, we used the HBase API to write directly to 
HBase.

For HBase 0.98, it's something like this:

rdd.foreachPartition(partition => {
   // create hbase config
   val hConf = HBaseConfiguration.create()
   val hTable = new HTable(hConf, "TABLE_1")
   hTable.setAutoFlush(false)

   partition.foreach(r => {
 // generate row key
 // create row
 val hRow = new Put(rowKey)

 // add columns
 hRow.add(..)

 hTable.put(hRow)
   })
   hTable.flushCommits()
   hTable.close()
})

HTH,
Deng

On Tue, Oct 27, 2015 at 5:36 PM, avivb 
> wrote:
I have already try it with https://github.com/unicredit/hbase-rdd and
https://github.com/nerdammer/spark-hbase-connector and in both cases I get
timeout.

So I would like to know about other option to write from Spark to HBase
CDH4.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/There-is-any-way-to-write-from-spark-to-HBase-CDH4-tp25209.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: There is any way to write from spark to HBase CDH4?

2015-10-27 Thread Adrian Tanase
This is probably too low level but you could consider the async client inside 
foreachRdd:

https://github.com/OpenTSDB/asynchbase

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd





On 10/27/15, 11:36 AM, "avivb"  wrote:

>I have already try it with https://github.com/unicredit/hbase-rdd and
>https://github.com/nerdammer/spark-hbase-connector and in both cases I get
>timeout.
>
>So I would like to know about other option to write from Spark to HBase
>CDH4.
>
>Thanks!
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/There-is-any-way-to-write-from-spark-to-HBase-CDH4-tp25209.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: There is any way to write from spark to HBase CDH4?

2015-10-27 Thread Adrian Tanase
You can get a feel for it by playing with the original library published as 
separate project on github
https://github.com/cloudera-labs/SparkOnHBase

From: Deng Ching-Mallete
Date: Tuesday, October 27, 2015 at 12:39 PM
To: Fengdong Yu
Cc: Adrian Tanase, avivb, user
Subject: Re: There is any way to write from spark to HBase CDH4?

It's still in HBase' trunk, scheduled for 2.0.0 release based on Jira ticket.

-Deng

On Tue, Oct 27, 2015 at 6:35 PM, Fengdong Yu 
<fengdo...@everstring.com<mailto:fengdo...@everstring.com>> wrote:
Does this released with Spark1.*? or still kept in the trunk?




On Oct 27, 2015, at 6:22 PM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:

Also I just remembered about cloudera’s contribution
http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/

From: Deng Ching-Mallete
Date: Tuesday, October 27, 2015 at 12:03 PM
To: avivb
Cc: user
Subject: Re: There is any way to write from spark to HBase CDH4?

Hi,

We are using phoenix-spark (http://phoenix.apache.org/phoenix_spark.html) to 
write data to HBase, but it requires spark 1.3.1+ and phoenix 4.4+. Previously, 
when we were still on spark 1.2, we used the HBase API to write directly to 
HBase.

For HBase 0.98, it's something like this:

rdd.foreachPartition(partition => {
   // create hbase config
   val hConf = HBaseConfiguration.create()
   val hTable = new HTable(hConf, "TABLE_1")
   hTable.setAutoFlush(false)

   partition.foreach(r => {
 // generate row key
 // create row
 val hRow = new Put(rowKey)

 // add columns
 hRow.add(..)

 hTable.put(hRow)
   })
   hTable.flushCommits()
   hTable.close()
})

HTH,
Deng

On Tue, Oct 27, 2015 at 5:36 PM, avivb 
<a...@taykey.com<mailto:a...@taykey.com>> wrote:
I have already try it with https://github.com/unicredit/hbase-rdd and
https://github.com/nerdammer/spark-hbase-connector and in both cases I get
timeout.

So I would like to know about other option to write from Spark to HBase
CDH4.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/There-is-any-way-to-write-from-spark-to-HBase-CDH4-tp25209.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com<http://Nabble.com>.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>


Re: SPARKONHBase checkpointing issue

2015-10-27 Thread Adrian Tanase
Does this help?

https://issues.apache.org/jira/browse/SPARK-5206



On 10/27/15, 1:53 PM, "Amit Singh Hora"  wrote:

>Hi all ,
>
>I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
>below code
>object test {
>  
>def main(args: Array[String]): Unit = {
>
>
>
>   val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>val checkpointDirectory=conf.getString("spark.checkpointDir")
>val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
>  functionToCreateContext(checkpointDirectory)
>})
> 
>
>ssc.start()
>ssc.awaitTermination()
>
> }
>
>def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
>  println("always gets created")
>   val hconf = HBaseConfiguration.create();
>val timeout= conf.getString("hbase.zookeepertimeout")
>val master=conf.getString("hbase.hbase_master")
>val zk=conf.getString("hbase.hbase_zkquorum")
>val zkport=conf.getString("hbase.hbase_zk_port")
>
>  hconf.set("zookeeper.session.timeout",timeout);
>hconf.set("hbase.client.retries.number", Integer.toString(1));
>hconf.set("zookeeper.recovery.retry", Integer.toString(1));
>hconf.set("hbase.master", master);
>hconf.set("hbase.zookeeper.quorum",zk);
>hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
>hconf.set("hbase.zookeeper.property.clientPort",zkport );
>
>   
>val hbaseContext = new HBaseContext(sc, hconf);
>return hbaseContext
>}
>  def functionToCreateContext(checkpointDirectory: String): StreamingContext
>= {
>println("creating for frst time")
>val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>val brokerlist = conf.getString("kafka.broker")
>val topic = conf.getString("kafka.topic")
>
>val Array(brokers, topics) = Array(brokerlist, topic)
>
>
>val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
>" )
>sparkConf.set("spark.cleaner.ttl", "2");
>sparkConf.setMaster("local[2]")
>
>
> val topicsSet = topic.split(",").toSet
>val batchduration = conf.getString("spark.batchduration").toInt
>val ssc: StreamingContext = new StreamingContext(sparkConf,
>Seconds(batchduration))
>  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokerlist, "auto.offset.reset" -> "smallest")
>val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
>  ssc, kafkaParams, topicsSet)
>val lines=messages.map(_._2)
>   
>
>
>getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
>  "ecs_test",
>  (putRecord) => {
>if (putRecord.length() > 0) {
>  var maprecord = new HashMap[String, String];
>  val mapper = new ObjectMapper();
>
>  //convert JSON string to Map
>  maprecord = mapper.readValue(putRecord,
>new TypeReference[HashMap[String, String]]() {});
>  
>  var ts: Long = maprecord.get("ts").toLong
>  
>   var tweetID:Long= maprecord.get("id").toLong
>  val key=ts+"_"+tweetID;
>  
>  val put = new Put(Bytes.toBytes(key))
>  maprecord.foreach(kv => {
> 
> 
>put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>  
>
>  })
>
>
>  put
>} else {
>  null
>}
>  },
>  false);
>
>ssc
>  
>  }
>}
>
>i am not able to retrieve from checkpoint after restart ,always get 
>Unable to getConfig from broadcast
>
>after debugging more i can see that the method for creating the HbaseContext
>actually broadcasts the configuration ,context object passed
>
>as a solution i just want to recreate the hbase context in every condition
>weather the checkpoint exists or not
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Separate all values from Iterable

2015-10-27 Thread Adrian Tanase
The operator you’re looking for is .flatMap. It flattens all the results if you 
have nested lists of results (e.g. A map over a source element can return zero 
or more target elements)
I’m not very familiar with the Java APIs but in scala it would go like this 
(keeping type annotations only as documentation):

def toBson(bean: ProductBean): BSONObject = { … }

val customerBeans: RDD[(Long, Seq[ProductBean])] = 
allBeans.groupBy(_.customerId)
val mongoObjects: RDD[BSONObject] = customerBeans.flatMap { case (id, beans) => 
beans.map(toBson) }

Hope this helps,
-adrian

From: Shams ul Haque
Date: Tuesday, October 27, 2015 at 12:50 PM
To: "user@spark.apache.org"
Subject: Separate all values from Iterable

Hi,


I have grouped all my customers in JavaPairRDD by 
there customerId (of Long type). Means every customerId have a List or 
ProductBean.

Now i want to save all ProductBean to DB irrespective of customerId. I got all 
values by using method
JavaRDD values = custGroupRDD.values();

Now i want to convert JavaRDD to JavaRDD so that i can save it to Mongo. Remember, every BSONObject is made 
of Single ProductBean.

I am not getting any idea of how to do this in Spark, i mean which Spark's 
Transformation is used to do that job. I think this task is some kind of 
seperate all values from Iterable. Please let me know how is this possible.
Any hint in Scala or Python are also ok.


Thanks

Shams


Re: [Spark Streaming] How do we reset the updateStateByKey values.

2015-10-26 Thread Adrian Tanase
Have you considered union-ing the 2 streams? Basically you can consider them as 
2 “message types” that your update function can consume (e.g. implement a 
common interface):

  *   regularUpdate
  *   resetStateUpdate

Inside your updateStateByKey you can check if any of the messages in the list 
of updates is an resetState message. If now, continue summing the others.

I can provide scala samples, my java is beyond rusty :)

-adrian

From: Uthayan Suthakar
Date: Friday, October 23, 2015 at 2:10 PM
To: Sander van Dijk
Cc: user
Subject: Re: [Spark Streaming] How do we reset the updateStateByKey values.

Hi Sander,

Thank you for your very informative email. From your email, I've learned a 
quite a bit.

>>>Is the condition determined somehow from the data coming through streamLogs, 
>>>and is newData streamLogs again (rather than a whole data source?)

No, they are two different Streams. I have two stream receivers, one of which 
sends event regularly and the other is not so regular (this data is computed by 
another application and stored into HDFS). What I'm trying to do is pick up the 
data from HDFS and overwrite the Stream's state. Hence the overwriting should 
only take place if there were new files in HDFS.

So we have two different RDDs. If no file is found in HDFS, it will simply read 
the regular stream, compute and update the state(1) and output the result. If 
there is a file found in HDFS, then it should overwrite the state (1) with the 
data found from HDFS so the new events from the regular stream will carry on 
with the new overwritten state.

I managed to get most of it done, but only having the issue with overwriting 
the state.



On 22 October 2015 at 19:35, Sander van Dijk 
> wrote:
I don't think it is possible in the way you try to do it. It is important to 
remember that the statements you mention only set up the stream stages, before 
the stream is actually running. Once it's running, you cannot change, remove or 
add stages.

I am not sure how you determine your condition and what the actual change 
should be when that condition is met: you say you want a different update 
function but then give a statement with the same update function but a 
different source stream). Is the condition determined somehow from the data 
coming through streamLogs, and is newData basically streamLogs again (rather 
than a whole data source?). In that case I can think of 3 things to try:

- if the condition you switch on can be determined independently from every 
item in streamLogs, you can simply do an if/else inside updateResultsStream to 
change the method that you determine your state
- if this is not the case, but you can determine when to switch your condition 
for each key independently, you can extend your state type to also keep track 
of your condition: rather than using JavaPairDStream you make 
updatedResultsState a JavaPairDStream> (assuming 
you have some class Pair), and you make updateResultsStream update and check 
the state of the boolean.
- finally, you can have a separate state stream that keeps track of your 
condition globally, then join that with you main stream and use that to update 
state. Something like:

// determineCondition should result in a reduction to a single item that 
signals whether the condition is met in the current batch, updateContitionState 
should remember that
conditionStateStream = 
streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState)

// addCondition gets RDDs from streamLogs and  single-item RDDs with the 
condition state and should add that state to each item in the streamLogs RDD
joinedStream = streamLogs.transformWith(conditionStateStream, addCondition)

// This is similar to the extend state type of the previous idea, but now your 
condition state is determined globally rather than per log entry
updatedResultsState = joinedStream.updateStateByKey(updateResultsStream)

I hope this applies to your case and that it makes sense, my Java is a bit 
rusty :) and perhaps others can suggest better spark streaming methods that can 
be used, but hopefully the idea is clear.

Sander

On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar 
> wrote:
Hello guys,

I have a stream job that will carryout computations and update the state (SUM 
the value). At some point, I would like to reset the state. I could drop the 
state by setting 'None' but I don't want to drop it. I would like to keep the 
state but update the state.


For example:

JavaPairDStream updatedResultsState = 
streamLogs.updateStateByKey(updateResultsStream);

At some condition, I would like to update the state by key but with the 
different values, hence different update function.


e.g.

 updatedResultsState = newData.updateStateByKey(resetResultsStream);

But the  newData.updateStateByKeyvalues cannot be 

Re: Secondary Sorting in Spark

2015-10-26 Thread Adrian Tanase
Do you have a particular concern? You’re always using a partitioner (default is 
HashPartitioner) and the Partitioner interface is pretty light, can’t see how 
it could affect performance.

Used correctly it should improve performance as you can better control 
placement of data and avoid shuffling…

-adrian

From: swetha kasireddy
Date: Monday, October 26, 2015 at 6:56 AM
To: Adrian Tanase
Cc: Bill Bejeck, "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Secondary Sorting in Spark

Hi,

Does the use of custom partitioner in Streaming affect performance?

On Mon, Oct 5, 2015 at 1:06 PM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
Great article, especially the use of a custom partitioner.

Also, sorting by multiple fields by creating a tuple out of them is an awesome, 
easy to miss, Scala feature.

Sent from my iPhone

On 04 Oct 2015, at 21:41, Bill Bejeck 
<bbej...@gmail.com<mailto:bbej...@gmail.com>> wrote:

I've written blog post on secondary sorting in Spark and I'd thought I'd share 
it with the group

http://codingjunkie.net/spark-secondary-sort/

Thanks,
Bill



Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-26 Thread Adrian Tanase
If I understand the order correctly, not really. First of all, the easiest way 
to make sure it works as expected is to check out the visual DAG in the spark 
UI.

It should map 1:1 to your code, and since I don’t see any shuffles in the 
operations below it should execute all in one stage, forking after X.
That means that all the executor cores will each process a partition completely 
in isolation, most likely 3 tasks (A, B, X2). Most likely in the order you 
define in code although depending on the data some tasks may get skipped or 
moved around.

I’m curious – why do you ask? Do you have a particular concern or use case that 
relies on ordering between A, B and X2?

-adrian

From: Nipun Arora
Date: Sunday, October 25, 2015 at 4:09 PM
To: Andy Dang
Cc: user
Subject: Re: [SPARK STREAMING] Concurrent operations in spark streaming

So essentially the driver/client program needs to explicitly have two threads 
to ensure concurrency?

What happens when the program is sequential... I.e. I execute function A and 
then function B. Does this mean that each RDD first goes through function A, 
and them stream X is persisted, but processed in function B only after the RDD 
has been processed by A?

Thanks
Nipun
On Sat, Oct 24, 2015 at 5:32 PM Andy Dang 
> wrote:
If you execute the collect step (foreach in 1, possibly reduce in 2) in two 
threads in the driver then both of them will be executed in parallel. Whichever 
gets submitted to Spark first gets executed first - you can use a semaphore if 
you need to ensure the ordering of execution, though I would assume that the 
ordering wouldn't matter.

---
Regards,
Andy

On Sat, Oct 24, 2015 at 10:08 PM, Nipun Arora 
> wrote:
I wanted to understand something about the internals of spark streaming 
executions.

If I have a stream X, and in my program I send stream X to function A and 
function B:

1. In function A, I do a few transform/filter operations etc. on X->Y->Z to 
create stream Z. Now I do a forEach Operation on Z and print the output to a 
file.

2. Then in function B, I reduce stream X -> X2 (say min value of each RDD), and 
print the output to file

Are both functions being executed for each RDD in parallel? How does it work?

Thanks
Nipun




Re: Accumulators internals and reliability

2015-10-26 Thread Adrian Tanase
I can reply from an user’s perspective – I defer to semantic guarantees to 
someone with more experience.

I’ve successfully implemented the following using a custom Accumulable class:

  *   Created a MapAccumulator with dynamic keys (they are driven by the data 
coming in), as opposed to creating many discrete accumulators
 *   The merge operation is add the values on key conflict
  *   I’m adding K->Vs to this accumulator in a variety of places (maps, 
flatmaps, transforms and updateStateBy key)
  *   In a foreachRdd at the end of the transformations I’m reading the 
accumulator and writing the counters to OpenTSDB
 *   after this I’m resetting it to the “zero” value (e.g. Empty map)

Everything works as expected in terms of functionality - with 2 caveats:

  *   On task/job failure you might get duplicate values for the tasks that are 
retried in the active job since adding to an Accumulator in a transformation is 
a side effect
 *   I’m partially working around this by also referring to the RDD time 
and overwriting the values in OpenTSDB (idempotent operation)
  *   If you have stateful transformations and you use checkpointing, the 
accumulator code becomes really intrusive in your codebase
 *   You will need to have a global singleton in your driver and 
“getInstance” in a foreachRdd or transform, to force code execution on the 
driver
 *   This is because on restoring from checkpoint your accumulators will be 
NULL as the checkpoint recovery makes no attempt to initialize them (See 
SPARK-5206)

Hope this helps,
-adrian

From: "Sela, Amit"
Date: Monday, October 26, 2015 at 11:13 AM
To: "user@spark.apache.org"
Subject: Accumulators internals and reliability

It seems like there is not much literature about Spark's Accumulators so I 
thought I'd ask here:

Do Accumulators reside in a Task ? Are they being serialized with the task ? 
Sent back on task completion as part of the ResultTask ?

Are they reliable ? If so, when ? Can I relay on accumulators value only after 
the task was successfully complete (meaning in the driver) ? Or also during the 
task execution as well (what about speculative execution) ?

What are the limitations on the number (or size) of Accumulators ?

Thanks,
Amit


Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-26 Thread Adrian Tanase
Thinking more about it – it should only be 2 tasks as A and B are most likely 
collapsed by spark in a single task.

Again – learn to use the spark UI as it’s really informative. The combination 
of DAG visualization and task count should answer most of your questions.

-adrian

From: Adrian Tanase
Date: Monday, October 26, 2015 at 11:57 AM
To: Nipun Arora, Andy Dang
Cc: user
Subject: Re: [SPARK STREAMING] Concurrent operations in spark streaming

If I understand the order correctly, not really. First of all, the easiest way 
to make sure it works as expected is to check out the visual DAG in the spark 
UI.

It should map 1:1 to your code, and since I don’t see any shuffles in the 
operations below it should execute all in one stage, forking after X.
That means that all the executor cores will each process a partition completely 
in isolation, most likely 3 tasks (A, B, X2). Most likely in the order you 
define in code although depending on the data some tasks may get skipped or 
moved around.

I’m curious – why do you ask? Do you have a particular concern or use case that 
relies on ordering between A, B and X2?

-adrian

From: Nipun Arora
Date: Sunday, October 25, 2015 at 4:09 PM
To: Andy Dang
Cc: user
Subject: Re: [SPARK STREAMING] Concurrent operations in spark streaming

So essentially the driver/client program needs to explicitly have two threads 
to ensure concurrency?

What happens when the program is sequential... I.e. I execute function A and 
then function B. Does this mean that each RDD first goes through function A, 
and them stream X is persisted, but processed in function B only after the RDD 
has been processed by A?

Thanks
Nipun
On Sat, Oct 24, 2015 at 5:32 PM Andy Dang 
<nam...@gmail.com<mailto:nam...@gmail.com>> wrote:
If you execute the collect step (foreach in 1, possibly reduce in 2) in two 
threads in the driver then both of them will be executed in parallel. Whichever 
gets submitted to Spark first gets executed first - you can use a semaphore if 
you need to ensure the ordering of execution, though I would assume that the 
ordering wouldn't matter.

---
Regards,
Andy

On Sat, Oct 24, 2015 at 10:08 PM, Nipun Arora 
<nipunarora2...@gmail.com<mailto:nipunarora2...@gmail.com>> wrote:
I wanted to understand something about the internals of spark streaming 
executions.

If I have a stream X, and in my program I send stream X to function A and 
function B:

1. In function A, I do a few transform/filter operations etc. on X->Y->Z to 
create stream Z. Now I do a forEach Operation on Z and print the output to a 
file.

2. Then in function B, I reduce stream X -> X2 (say min value of each RDD), and 
print the output to file

Are both functions being executed for each RDD in parallel? How does it work?

Thanks
Nipun




Re: Spark StreamingStatefull information

2015-10-22 Thread Adrian Tanase
The result of updatestatebykey is a dstream that emits the entire state every 
batch - as an RDD - nothing special about it.

It easy to join / cogroup with another RDD if you have the correct keys in both.
You could load this one when the job starts and/or have it update with 
updatestatebykey as well, based on streaming updates from cassandra.

Sent from my iPhone

> On 22 Oct 2015, at 12:54, Arttii  wrote:
> 
> Hi,
> 
> So I am working on a usecase, where Clients are walking in and out of
> geofences and sendingmessages based on that.
> I currently have some in Memory Broadcast vars to do certain lookups for
> client and geofence info, some of this is also coming from Cassandra.
> My current quandry is that I need to support the case where a user comes in
> and out of geofence and also track how many messages have already been sent
> and do some logic based on that.
> 
> My stream is basically a bunch  of jsons
> {
> member:""
> beacon
> state:"exit","enter"
> }
> 
> 
> This information is invalidated at certain timesteps say messages a day and
> geofence every few minutes. Frist I thought if broadcast vars are good for
> this, but this gets updated a bunch so i do not think I can peridically
> rebroadcast these from the driver.
> 
> So I was thinking this might be a perfect case for UpdateStateByKey as I can
> kinda track what is going
> and also track the time inside the values and return Nones to "pop" things.
> 
> Currently I cannot wrap my head around on how to use this stream in
> conjuction with some other info that is coming in "Dstreams" "Rdds". All the
> example for UpdateStatebyKey are basically doing something to a stream
> updateStateBykey and then foreaching over it and persisting in a store. I
> dont think writing and reading from cassandra on every batch to get this
> info is a good idea, because I might get stale info.
> 
> Is this a valid case or am I missing the point and usecase of this function?
> 
> Thanks,
> Artyom
> 
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-StreamingStatefull-information-tp25160.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Analyzing consecutive elements

2015-10-22 Thread Adrian Tanase
Drop is a method on scala’s collections (array, list, etc) - not on Spark’s 
RDDs. You can look at it as long as you use mapPartitions or something like 
reduceByKey, but it totally depends on the use-cases you have for analytics.

The others have suggested better solutions using only spark’s APIs.

-adrian

From: Sampo Niskanen
Date: Thursday, October 22, 2015 at 2:12 PM
To: Adrian Tanase
Cc: user
Subject: Re: Analyzing consecutive elements

Hi,

Sorry, I'm not very familiar with those methods and cannot find the 'drop' 
method anywhere.

As an example:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)
// ... then what?


Thanks.


Best regards,

Sampo Niskanen
Lead developer / Wellmo

sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com>
    +358 40 820 5291


On Thu, Oct 22, 2015 at 10:43 AM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
I'm not sure if there is a better way to do it directly using Spark APIs but I 
would try to use mapPartitions and then within each partition Iterable to:

rdd.zip(rdd.drop(1)) - using the Scala collection APIs

This should give you what you need inside a partition. I'm hoping that you can 
partition your data somehow (e.g by user id or session id) that makes you 
algorithm parallel. In that case you can use the snippet above in a reduceByKey.

hope this helps
-adrian

Sent from my iPhone

On 22 Oct 2015, at 09:36, Sampo Niskanen 
<sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com>> wrote:

Hi,

I have analytics data with timestamps on each element.  I'd like to analyze 
consecutive elements using Spark, but haven't figured out how to do this.

Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E] to 
an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze 
time-related elements.)

How can this be achieved?


Sampo Niskanen
Lead developer / Wellmo

sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com>
+358 40 820 5291




Re: Job splling to disk and memory in Spark Streaming

2015-10-21 Thread Adrian Tanase
+1 – you can definitely make it work by making sure you are using the same 
partitioner (including the same number of partitions).

For most operations like reduceByKey, updateStateByKey – simply specifying it 
enough.

There are some gotchas for other operations:

  *   mapValues and flatMapValues preserve partitioning
  *   map and flatMap don’t as they can’t be sure of your logic. If you are 
absolutely sure that the emitted values will remain on the same partition then 
you can also override the partitioner to avoid shuffle
  *   Union on 2 Dstreams throws away partitioning. Again, if you know that 
it’s safe to do it, then you need to look at transformWith and push down to 
RDD.union which preserves partitioning

By using these tricks I’ve successfully forced a pretty complex streaming 
pipeline (including 2 updateStateByKey, unions, flatmaps, repartitions, custom 
partitioner, etc) to execute in a single stage.

Hope this helps,
-adrian

From: Tathagata Das
Date: Wednesday, October 21, 2015 at 10:36 AM
To: swetha
Cc: user
Subject: Re: Job splling to disk and memory in Spark Streaming

Well, reduceByKey needs to shutffle if your intermediate data is not already 
partitioned in the same way as reduceByKey's partitioning.

reduceByKey() has other signatures that take in a partitioner, or simply number 
of partitions. So you can set the same partitioner as your previous stage. 
Without any further insight into the structure of your code its hard to say 
anything more.

On Tue, Oct 20, 2015 at 5:59 PM, swetha 
> wrote:
Hi,

Currently I have a job that has spills to disk and memory due to usage of
reduceByKey and a lot of intermediate data in reduceByKey that gets
shuffled.

How to use custom partitioner in Spark Streaming for  an intermediate stage
so that  the next stage that uses reduceByKey does not have to do shuffles?

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Job-splling-to-disk-and-memory-in-Spark-Streaming-tp25149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Spark on Yarn

2015-10-21 Thread Adrian Tanase
The question is the spark dependency is marked as provided or is included in 
the fat jar.

For example, we are compiling the spark distro separately for java 8 + scala 
2.11 + hadoop 2.6 (with maven) and marking it as provided in sbt.

-adrian

From: Raghuveer Chanda
Date: Wednesday, October 21, 2015 at 2:14 PM
To: Jean-Baptiste Onofré
Cc: "user@spark.apache.org"
Subject: Re: Spark on Yarn

Hi,

So does this mean I can't run spark 1.4 fat jar on yarn without installing 
spark 1.4.

I am including spark 1.4 in my pom.xml so doesn't this mean its compiling in 
1.4.


On Wed, Oct 21, 2015 at 4:38 PM, Jean-Baptiste Onofré 
> wrote:
Hi

The compiled version (master side) and client version diverge on spark network 
JavaUtils. You should use the same/aligned version.

Regards
JB



Sent from my Samsung device


 Original message 
From: Raghuveer Chanda 
>
Date: 21/10/2015 12:33 (GMT+01:00)
To: user@spark.apache.org
Subject: Spark on Yarn

Hi all,

I am trying to run spark on yarn in quickstart cloudera vm.It already has spark 
1.3 and Hadoop 2.6.0-cdh5.4.0 installed.(I am not using spark-submit since I 
want to run a different version of spark).

I am able to run spark 1.3 on yarn but get the below error for spark 1.4.

The log shows its running on spark 1.4 but still gives a error on a method 
which is present in 1.4 and not 1.3. Even the fat jar contains the class files 
of 1.4.

As far as running in yarn the installed spark version shouldnt matter, but 
still its running on the other version.


Hadoop Version:
Hadoop 2.6.0-cdh5.4.0
Subversion http://github.com/cloudera/hadoop -r 
c788a14a5de9ecd968d1e2666e8765c5f018c271
Compiled by jenkins on 2015-04-21T19:18Z
Compiled with protoc 2.5.0
From source with checksum cd78f139c66c13ab5cee96e15a629025
This command was run using /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.4.0.jar

Error:
LogType:stderr
Log Upload Time:Tue Oct 20 21:58:56 -0700 2015
LogLength:2334
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/filecache/10/simple-yarn-app-1.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/10/20 21:58:50 INFO spark.SparkContext: Running Spark version 1.4.0
15/10/20 21:58:53 INFO spark.SecurityManager: Changing view acls to: yarn
15/10/20 21:58:53 INFO spark.SecurityManager: Changing modify acls to: yarn
15/10/20 21:58:53 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(yarn); users with 
modify permissions: Set(yarn)
Exception in thread "main" java.lang.NoSuchMethodError: 
org.apache.spark.network.util.JavaUtils.timeStringAsSec(Ljava/lang/String;)J
at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1027)
at org.apache.spark.SparkConf.getTimeAsSeconds(SparkConf.scala:194)
at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:68)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1991)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1982)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:52)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:247)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:188)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)
at org.apache.spark.SparkContext.(SparkContext.scala:424)
at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at com.hortonworks.simpleyarnapp.HelloWorld.main(HelloWorld.java:50)
15/10/20 21:58:53 INFO util.Utils: Shutdown hook called

Please help :)

--
Regards and Thanks,
Raghuveer Chanda



--
Regards,
Raghuveer Chanda
Computer Science and Engineering
IIT Kharagpur
+91-9475470374


Re: Whether Spark is appropriate for our use case.

2015-10-21 Thread Adrian Tanase
Can you share your approximate data size? all should be valid use cases for 
spark, wondering if you are providing enough resources.

Also - do you have some expectations in terms of performance? what does "slow 
down" mean?

For this usecase I would personally favor parquet over DB, and sql/dataframes 
over regular spark RDDs as you get some benefits related to predicate pushdown, 
etc.

Sent from my iPhone

> On 21 Oct 2015, at 00:29, Aliaksei Tsyvunchyk  wrote:
> 
> Hello all community members,
> 
> I need opinion of people who was using Spark before and can share there 
> experience to help me select technical approach.
> I have a project in Proof Of Concept phase, where we are evaluating 
> possibility of Spark usage for our use case. 
> Here is brief task description.
> We should process big amount of raw data to calculate ratings. We have 
> different type of textual source data. This is just text lines which 
> represents different type of input data (we call them type 20, type 24, type 
> 26, type 33, etc).
> To perform calculations we should make joins between diffrerent type of raw 
> data - event records (which represents actual user action) and users 
> description records (which represents person which performs action) and 
> sometimes with userGroup record (we group all users by some criteria).
> All ratings are calculated on daily basis and our dataset could be 
> partitioned by date (except probably reference data).
> 
> 
> So we have tried to implement it using possibly most obvious way, we parse 
> text file, store data in parquet format and trying to use sparkSQL to query 
> data and perform calculation.
> Experimenting with sparkSQL I’ve noticed that SQL query speed decreased 
> proportionally to data size growth. Base on this I assume that SparkSQL 
> performs full records scan while servicing my SQL queries.
> 
> So here are the questions I’m trying to find answers:
> 1.  Is parquet format appropriate for storing data in our case (to 
> efficiently query data)? Could it be more suitable to have some DB as storage 
> which could filter data efficiently before it gets to Spark processing engine 
> ?
> 2.  For now we assume that joins we are doing for calculations slowing down 
> execution. As alternatives we consider denormalizing data and join it on 
> parsing phase, but this increase data volume Spark should handle (due to the 
> duplicates we will get). Is it valid approach? Would it be better if we 
> create 2 RDD, from Parquet files filter them out and next join without 
> sparkSQL involvement?  Or joins in SparkSQL are fine and we should look for 
> performance bottlenecks in different place?
> 3.  Should we look closer on Cloudera Impala? As I know it is working over 
> the same parquet files and I’m wondering whether it gives better performance 
> for data querying ?
> 4.  90% of results we need could be pre-calculated since they are not change 
> after one day of data is loaded. So I think it makes sense to keep this 
> pre-calculated data in some DB storage which give me best performance while 
> querying by key. Now I’m consider to use Cassandra for this purpose due to 
> it’s perfect scalability and performance. Could somebody provide any other 
> options we can consider ?
> 
> Thanks in Advance,
> Any opinion will be helpful and greatly appreciated
> -- 
> 
> 
> CONFIDENTIALITY NOTICE: This email and files attached to it are 
> confidential. If you are not the intended recipient you are hereby notified 
> that using, copying, distributing or taking any action in reliance on the 
> contents of this information is strictly prohibited. If you have received 
> this email in error please notify the sender and delete this email.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: difference between rdd.collect().toMap to rdd.collectAsMap() ?

2015-10-20 Thread Adrian Tanase
If you look at the source code you’ll see that this is merely a convenience 
function on PairRDDs - only interesting detail is that it uses a mutable 
HashMap to optimize creating maps with many keys. That being said, .collect() 
is called anyway.

https://github.com/apache/spark/blob/f85aa06464a10f5d1563302fd76465dded475a12/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L741-L753


-adrian




On 10/20/15, 12:35 PM, "kali.tumm...@gmail.com"  wrote:

>Hi All, 
>
>Is there any performance impact when I use collectAsMap on my RDD instead of
>rdd.collect().toMap ?
>
>I have a key value rdd and I want to convert to HashMap as far I know
>collect() is not efficient on large data sets as it runs on driver can I use
>collectAsMap instead is there any performance impact ?
>
>Original:-
> val QuoteHashMap=QuoteRDD.collect().toMap
> val QuoteRDDData=QuoteHashMap.values.toSeq
> val QuoteRDDSet=sc.parallelize(QuoteRDDData.map(x =>
>x.toString.replace("(","").replace(")","")))
> QuoteRDDSet.saveAsTextFile(Quotepath)
>
>Change:-
> val QuoteHashMap=QuoteRDD.collectAsMap()
> val QuoteRDDData=QuoteHashMap.values.toSeq
> val QuoteRDDSet=sc.parallelize(QuoteRDDData.map(x =>
>x.toString.replace("(","").replace(")","")))
> QuoteRDDSet.saveAsTextFile(Quotepath)
>
>
>
>Thanks
>Sri 
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/difference-between-rdd-collect-toMap-to-rdd-collectAsMap-tp25139.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Partition for each executor

2015-10-20 Thread Adrian Tanase
I think it should use the default parallelism which by default is equal to the 
number of cores in your cluster.

If you want to control it, specify a value for numSlices - the second param to 
parallelize().

-adrian



On 10/20/15, 6:13 PM, "t3l"  wrote:

>If I have a cluster with 7 nodes, each having an equal amount of cores and
>create an RDD with sc.parallelize() it looks as if the Spark will always
>tries to distribute the partitions.
>
>Question:
>(1) Is that something I can rely on?
>
>(2) Can I rely that sc.parallelize() will assign partitions to as many
>executers as possible. Meaning: Let's say I request 7 partitions, is each
>node guaranteed to get 1 of these partitions? If I select 14 partitions, is
>each node guaranteed to grab 2 partitions?
>
>P.S.: I am aware that for other cases like sc.hadoopFile, this might depend
>in the actual storage location of the data. I am merely asking for the
>sc.parallelize() case.
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Partition-for-each-executor-tp25141.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Should I convert json into parquet?

2015-10-19 Thread Adrian Tanase
For general data access of the pre-computed aggregates (group by) you’re better 
off with Parquet. I’d only choose JSON if I needed interop with another app 
stack / language that has difficulty accessing parquet (E.g. Bulk load into 
document db…).

On a strategic level, both JSON and parquet are similar since neither give you 
good random access, so you can’t simply “update specific user Ids on new data 
coming in”. Your strategy will probably be to re-process all the users by 
loading new data and current aggregates, joining and writing a new version of 
the aggregates…

If you’re worried about update performance then you probably need to look at a 
DB that offers random write access (Cassandra, Hbase..)

-adrian




On 10/19/15, 12:31 PM, "Ewan Leith"  wrote:

>As Jörn says, Parquet and ORC will get you really good compression and can be 
>much faster. There also some nice additions around predicate pushdown which 
>can be great if you've got wide tables.
>
>Parquet is obviously easier to use, since it's bundled into Spark. Using ORC 
>is described here 
>http://hortonworks.com/blog/bringing-orc-support-into-apache-spark/
>
>Thanks,
>Ewan
>
>-Original Message-
>From: Jörn Franke [mailto:jornfra...@gmail.com] 
>Sent: 19 October 2015 06:32
>To: Gavin Yue 
>Cc: user 
>Subject: Re: Should I convert json into parquet?
>
>
>
>Good Formats are Parquet or ORC. Both can be useful with compression, such as 
>Snappy.   They are much faster than JSON. however, the table structure is up 
>to you and depends on your use case.
>
>> On 17 Oct 2015, at 23:07, Gavin Yue  wrote:
>> 
>> I have json files which contains timestamped events.  Each event associate 
>> with a user id. 
>> 
>> Now I want to group by user id. So converts from
>> 
>> Event1 -> UserIDA;
>> Event2 -> UserIDA;
>> Event3 -> UserIDB;
>> 
>> To intermediate storage. 
>> UserIDA -> (Event1, Event2...)
>> UserIDB-> (Event3...)
>> 
>> Then I will label positives and featurize the Events Vector in many 
>> different ways, fit each of them into the Logistic Regression. 
>> 
>> I want to save intermediate storage permanently since it will be used many 
>> times.  And there will new events coming every day. So I need to update this 
>> intermediate storage every day. 
>> 
>> Right now I store intermediate storage using Json files.  Should I use 
>> Parquet instead?  Or is there better solutions for this use case?
>> 
>> Thanks a lot !
>> 
>> 
>> 
>> 
>> 
>> 
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
>commands, e-mail: user-h...@spark.apache.org
>
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Spark Streaming - use the data in different jobs

2015-10-19 Thread Adrian Tanase
+1 for re-publishing to pubsub if there is only transient value in the data. If 
you need to query the intermediate representation then you will need to use a 
database.

Sharing RDDs in memory should be possible with projects like spark job server 
but I think that’s overkill in this scenario.

Lastly, if there is no strong requirement to have different jobs, you might 
consider collapsing the 2 jobs into one.. And simply have multiple stages that 
execute in the same job.

-adrian

From: Ewan Leith
Date: Monday, October 19, 2015 at 12:34 PM
To: Oded Maimon, user
Subject: RE: Spark Streaming - use the data in different jobs

Storing the data in HBase, Cassandra, or similar is possibly the right answer, 
the other option that can work well is re-publishing the data back into second 
queue on RabbitMQ, to be read again by the next job.

Thanks,
Ewan

From: Oded Maimon [mailto:o...@scene53.com]
Sent: 18 October 2015 12:49
To: user >
Subject: Spark Streaming - use the data in different jobs

Hi,
we've build a spark streaming process that get data from a pub/sub (rabbitmq in 
our case).

now we want the streamed data to be used in different spark jobs (also in 
realtime if possible)

what options do we have for doing that ?


  *   can the streaming process and different spark jobs share/access the same 
RDD's?
  *   can the streaming process create a sparkSQL table and other jobs read/use 
it?
  *   can a spark streaming process trigger other spark jobs and send the the 
data (in memory)?
  *   can a spark streaming process cache the data in memory and other 
scheduled jobs access same rdd's?
  *   should we keep the data to hbase and read it from other jobs?
  *   other ways?

I believe that the answer will be using external db/storage..  hoping to have a 
different solution :)

Thanks.


Regards,
Oded Maimon
Scene53.


This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. 
Please note that any disclosure, copying or distribution of the content of this 
information is strictly forbidden. If you have received this email message in 
error, please destroy it immediately and notify its sender.


Re: Spark handling parallel requests

2015-10-19 Thread Adrian Tanase
To answer your specific question, you can’t push data to Kafka through a socket 
– you need a smart client library as the cluster setup is pretty advanced (also 
requires zookeeper).

I bet there are php libraries for Kafka although after a quick search it seems 
they’re still pretty young. Also – Kafka shines at larger deployments and 
throughput (tens of thousands to millions of events per second) and may be 
overkill for 100 events / sec.

Here are some other ideas:

  *   Use a lighter weight message broker like Rabbit MQ or MQTT – both have 
good integrations with spark and should be simpler to integrate with PHP
  *   Instead of doing a socket call, log the event on disk – this opens up 2 
strategies
 *   If you have access to shared storage, spark could read the files 
directly
 *   Otherwise, you could rely on something like 
Flume that can poll your logs and forward them to 
spark (There is a default integration in the spark external package)
  *   Lastly, why not try to build on one of the custom 
receivers? 
There are plenty code samples in the docs and examples
 *   This may not be a good choice if you can’t afford to lose any messages 
– in this case your life is harder as you’ll need to also use WAL based 
implementation

Hope this helps,
-adrian

From: 
"tarek.abouzei...@yahoo.com.INVALID"
Reply-To: "tarek.abouzei...@yahoo.com"
Date: Sunday, October 18, 2015 at 10:28 AM
To: Xiao Li, Akhil Das
Cc: "user@spark.apache.org"
Subject: Re: Spark handling parallel requests

hi Akhlis

its a must to push data to a socket as i am using php as a web service to push 
data to socket , then spark catch the data on that socket and process it , is 
there a way to push data from php to kafka directly ?

--  Best Regards, -- Tarek Abouzeid



On Sunday, October 18, 2015 10:26 AM, 
"tarek.abouzei...@yahoo.com" 
> wrote:


hi Xiao,
1- requests are not similar at all , but they use solr and do commit sometimes
2- no caching is required
3- the throughput must be very high yeah , the requests are tiny but the system 
may receive 100 request/sec ,
does kafka support listening to a socket ?

--  Best Regards, -- Tarek Abouzeid



On Monday, October 12, 2015 10:50 AM, Xiao Li 
> wrote:


Hi, Tarek,

It is hard to answer your question. Are these requests similar? Caching your 
results or intermediate results in your applications? Or does that mean your 
throughput requirement is very high? Throttling the number of concurrent 
requests? ...

As Akhil said, Kafka might help in your case. Otherwise, you need to read the 
designs or even source codes of Kafka and Spark Streaming.

 Best wishes,

Xiao Li


2015-10-11 23:19 GMT-07:00 Akhil Das 
>:
Instead of pushing your requests to the socket, why don't you push them to a 
Kafka or any other message queue and use spark streaming to process them?

Thanks
Best Regards

On Mon, Oct 5, 2015 at 6:46 PM, 
> 
wrote:
Hi ,
i am using Scala , doing a socket program to catch multiple requests at same 
time and then call a function which uses spark to handle each process , i have 
a multi-threaded server to handle the multiple requests and pass each to spark 
, but there's a bottleneck as the spark doesn't initialize a sub task for the 
new request , is it even possible to do parallel processing using single spark 
job ?
Best Regards,

--  Best Regards, -- Tarek Abouzeid








Re: How does shuffle work in spark ?

2015-10-19 Thread Adrian Tanase
I don’t know why it expands to 50 GB but it’s correct to see it both on the 
first operation (shuffled write) and on the next one (shuffled read). It’s the 
barrier between the 2 stages.

-adrian

From: shahid ashraf
Date: Monday, October 19, 2015 at 9:53 PM
To: Kartik Mathur, Adrian Tanase
Cc: user
Subject: Re: How does shuffle work in spark ?

hi  THANKS

i don't understand, if original data on partitions is 3.5 G and by doing 
shuffle to that... how it expands to 50 GB... and why then it reads 50 GB for 
next operations.. i have original data set 0f 100 GB then my data will explode 
to 1,428.5714286 GBs
and so shuffle reads will be 1,428.5714286 GBs that will be insane

On Mon, Oct 19, 2015 at 11:58 PM, Kartik Mathur 
<kar...@bluedata.com<mailto:kar...@bluedata.com>> wrote:
That sounds like correct shuffle output , in spark map reduce phase is 
separated by shuffle , in map each executer writes on local disk and in reduce 
phase reducerS reads data from each executer over the network , so shuffle 
definitely hurts performance , for more details on spark shuffle phase please 
read this

http://0x0fff.com/spark-architecture-shuffle/

Thanks
Kartik


On Mon, Oct 19, 2015 at 6:54 AM, shahid <sha...@trialx.com> wrote:
@all i did partitionby using default hash partitioner on data
[(1,data)(2,(data),(n,data)]
the total data was approx 3.5 it showed shuffle write 50G and on next action
e.g count it is showing shuffle read of 50 G. i don't understand this
behaviour and i think the performance is getting slow with so much shuffle
read on next tranformation operations.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-shuffle-work-in-spark-tp584p25119.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





--
with Regards
Shahid Ashraf


Re: Spark Streaming scheduler delay VS driver.cores

2015-10-19 Thread Adrian Tanase
Bump on this question – does anyone know what is the effect of 
spark.driver.cores on the driver's ability to manage larger clusters?

Any tips on setting a correct value? I’m running Spark streaming on Yarn / 
Hadoop 2.6 / Spark 1.5.1.

Thanks,
-adrian

From: Adrian Tanase
Date: Saturday, October 17, 2015 at 10:58 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Spark Streaming scheduler delay VS driver.cores

Hi,

I’ve recently bumped up the resources for a spark streaming job – and the 
performance started to degrade over time.
it was running fine on 7 nodes with 14 executor cores each (via Yarn) until I 
bumped executor.cores to 22 cores/node (out of 32 on AWS c3.xlarge, 24 for yarn)

The driver has 2 cores and 2 GB ram (usage is at zero).

For really low data volume it goes from 1-2 seconds per batch to 4-5 s/batch 
after about 6 hours, doing almost nothing. I’ve noticed that the scheduler 
delay is 3-4s, even 5-6 seconds for some tasks. Should be in the low tens of 
milliseconds. What’s weirder is that under moderate load (thousands of events 
per second) - the delay is not as obvious anymore.

After this I reduced the executor.cores to 20 and bumped driver.cores to 4 and 
it seems to be ok now.
However, this is totally empirical, I have not found any documentation, code 
samples or email discussion on how to properly set driver.cores.

Does anyone know:

  *   If I assign more cores to the driver/application manager, will it use 
them?
 *   I was looking at the process list with htop and only one of the jvm’s 
on the driver was really taking up CPU time
  *   What is a decent parallelism factor for a streaming app with 10-20 secs 
batch time? I found it odd that at  7 x 22 = 154 the driver is becoming a 
bottleneck
 *   I’ve seen people recommend 3-4 taks/core or ~1000 parallelism for 
clusters in the tens of nodes

Thanks in advance,
-adrian


Re: Streaming of COAP Resources

2015-10-19 Thread Adrian Tanase
I’m not familiar with you COAP library but onStart is called only once. You’re 
only reading the value once when the custom receiver is initialized.

You need to set-up a callback, poll a buffer — again, depends on your COAP 
client — In short configure your client to “start listening for changes”
Then you need to call .store() for every new value that you’re notified of.

-adrian



On 10/16/15, 9:38 AM, "Sadaf"  wrote:

>I am currently working on IOT Coap protocol.I accessed server on local host
>through copper firefox plugin. Then i Added resouce having "GET"
>functionality in server. After that i made its client as a streaming source.
>Here is the code of client streaming
>
> class customReceiver(test:String) extends 
>Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging with
>Serializable { 
>   @volatile private var stopped = false
>   override def onStart() {
>
>  val client = new CoapClient("ip/resource")
>  var text = client.get().getResponseText();  
>  store(text)
>   }
>   override def onStop(): Unit = synchronized { 
>  try
>  {
> stopped = true
>  }
>  catch
>  {
> case e: Exception => println("exception caught: " + e);
>  }
>   }
> }
>but i am facing a problem. During streaming it just read a resource once.
>after that it fetches all empty rdd and completes its batches. Meanwhile if
>resource changes its value it doesn't read that. are i doing something
>wrong? or is there exists any other functionality to read whenever resource
>get changed that i can handle in my Custom receiver.? or any idea about how
>to GET value continuously during streaming?
>
>Any help is much awaited and appreciated. Thanks
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-of-COAP-Resources-tp25084.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Differentiate Spark streaming in event logs

2015-10-19 Thread Adrian Tanase
You could try to start the 2/N jobs with a slightly different log4j template, 
by prepending some job type to all the messages...




On 10/19/15, 9:47 PM, "franklyn"  wrote:

>Hi I'm running a job to collect some analytics on spark jobs by analyzing
>their event logs. We write the event logs to a single HDFS folder and then
>pick them up in another job. I'd like to differentiate between regular spark
>jobs and spark streaming jobs in the event logs, i was wondering if there is
>an event/property/key that is different between the two.
>
>thanks!,
>
>Franklyn
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Differentiate-Spark-streaming-in-event-logs-tp25126.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: How to calculate row by now and output retults in Spark

2015-10-19 Thread Adrian Tanase
Are you by any chance looking for reduceByKey? IF you’re trying to collapse all 
the values in V into an aggregate, that’s what you should be looking at.

-adrian

From: Ted Yu
Date: Monday, October 19, 2015 at 9:16 PM
To: Shepherd
Cc: user
Subject: Re: How to calculate row by now and output retults in Spark

Under core/src/test/scala/org/apache/spark , you will find a lot of examples 
for map function.

FYI

On Mon, Oct 19, 2015 at 10:35 AM, Shepherd 
> wrote:
Hi all, I am new in Spark and Scala. I have a question in doing calculation. I 
am using "groupBy" to generate key value pair, and the value points to a subset 
of original RDD. The RDD has four columns, and each subset RDD may have 
different number of rows. For example, the original code like this:" val b = 
a.gorupBy(_._2) val res = b.map{case (k, v) => v.map(func)} " Here, I don't 
know how to write the func. I have to run each row in v, and calculate 
statistic result. How can I do that? And, how can I write function in Map? 
Thanks a lot.

View this message in context: How to calculate row by now and output retults in 
Spark
Sent from the Apache Spark User List mailing list 
archive at Nabble.com.



Spark Streaming scheduler delay VS driver.cores

2015-10-17 Thread Adrian Tanase
Hi,

I’ve recently bumped up the resources for a spark streaming job – and the 
performance started to degrade over time.
it was running fine on 7 nodes with 14 executor cores each (via Yarn) until I 
bumped executor.cores to 22 cores/node (out of 32 on AWS c3.xlarge, 24 for yarn)

The driver has 2 cores and 2 GB ram (usage is at zero).

For really low data volume it goes from 1-2 seconds per batch to 4-5 s/batch 
after about 6 hours, doing almost nothing. I’ve noticed that the scheduler 
delay is 3-4s, even 5-6 seconds for some tasks. Should be in the low tens of 
milliseconds. What’s weirder is that under moderate load (thousands of events 
per second) - the delay is not as obvious anymore.

After this I reduced the executor.cores to 20 and bumped driver.cores to 4 and 
it seems to be ok now.
However, this is totally empirical, I have not found any documentation, code 
samples or email discussion on how to properly set driver.cores.

Does anyone know:

  *   If I assign more cores to the driver/application manager, will it use 
them?
 *   I was looking at the process list with htop and only one of the jvm’s 
on the driver was really taking up CPU time
  *   What is a decent parallelism factor for a streaming app with 10-20 secs 
batch time? I found it odd that at  7 x 22 = 154 the driver is becoming a 
bottleneck
 *   I’ve seen people recommend 3-4 taks/core or ~1000 parallelism for 
clusters in the tens of nodes

Thanks in advance,
-adrian


Re: repartition vs partitionby

2015-10-17 Thread Adrian Tanase
If the dataset allows it you can try to write a custom partitioner to help 
spark distribute the data more uniformly.

Sent from my iPhone

On 17 Oct 2015, at 16:14, shahid ashraf 
> wrote:

yes i know about that,its in case to reduce partitions. the point here is the 
data is skewed to few partitions..


On Sat, Oct 17, 2015 at 6:27 PM, Raghavendra Pandey 
> wrote:
You can use coalesce function, if you want to reduce the number of partitions. 
This one minimizes the data shuffle.

-Raghav

On Sat, Oct 17, 2015 at 1:02 PM, shahid qadri 
> wrote:
Hi folks

I need to reparation large set of data around(300G) as i see some portions have 
large data(data skew)

i have pairRDDs [({},{}),({},{}),({},{})]

what is the best way to solve the the problem
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org





--
with Regards
Shahid Ashraf


Re: Building with SBT and Scala 2.11

2015-10-14 Thread Adrian Tanase
You are correct, of course. Gave up on sbt for spark long ago, I never managed 
to get it working while mvn works great.

Sent from my iPhone

On 14 Oct 2015, at 16:52, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

Adrian:
Likely you were using maven.

Jakob's report was with sbt.

Cheers

On Tue, Oct 13, 2015 at 10:05 PM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
Do you mean hadoop-2.4 or 2.6? not sure if this is the issue but I'm also 
compiling the 1.5.1 version with scala 2.11 and hadoop 2.6 and it works.

-adrian

Sent from my iPhone

On 14 Oct 2015, at 03:53, Jakob Odersky 
<joder...@gmail.com<mailto:joder...@gmail.com>> wrote:

I'm having trouble compiling Spark with SBT for Scala 2.11. The command I use 
is:

dev/change-version-to-2.11.sh<http://change-version-to-2.11.sh>
build/sbt -Pyarn -Phadoop-2.11 -Dscala-2.11

followed by

compile

in the sbt shell.

The error I get specifically is:

spark/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala:308: no 
valid targets for annotation on value conf - it is discarded unused. You may 
specify targets with meta-annotations, e.g. @(transient @param)
[error] private[netty] class NettyRpcEndpointRef(@transient conf: SparkConf)
[error]

However I am also getting a large amount of deprecation warnings, making me 
wonder if I am supplying some incompatible/unsupported options to sbt. I am 
using Java 1.8 and the latest Spark master sources.
Does someone know if I am doing anything wrong or is the sbt build broken?

thanks for you help,
--Jakob




Re: Building with SBT and Scala 2.11

2015-10-13 Thread Adrian Tanase
Do you mean hadoop-2.4 or 2.6? not sure if this is the issue but I'm also 
compiling the 1.5.1 version with scala 2.11 and hadoop 2.6 and it works.

-adrian

Sent from my iPhone

On 14 Oct 2015, at 03:53, Jakob Odersky 
> wrote:

I'm having trouble compiling Spark with SBT for Scala 2.11. The command I use 
is:

dev/change-version-to-2.11.sh
build/sbt -Pyarn -Phadoop-2.11 -Dscala-2.11

followed by

compile

in the sbt shell.

The error I get specifically is:

spark/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala:308: no 
valid targets for annotation on value conf - it is discarded unused. You may 
specify targets with meta-annotations, e.g. @(transient @param)
[error] private[netty] class NettyRpcEndpointRef(@transient conf: SparkConf)
[error]

However I am also getting a large amount of deprecation warnings, making me 
wonder if I am supplying some incompatible/unsupported options to sbt. I am 
using Java 1.8 and the latest Spark master sources.
Does someone know if I am doing anything wrong or is the sbt build broken?

thanks for you help,
--Jakob



Re: SQLContext within foreachRDD

2015-10-12 Thread Adrian Tanase
Not really, unless you’re doing something wrong (e.g. Call collect or similar).

In the foreach loop you’re typically registering a temp table, by converting an 
RDD to data frame. All the subsequent queries are executed in parallel on the 
workers.

I haven’t built production apps with this pattern but I have successfully built 
a prototype where I execute dynamic SQL on top of a 15 minute window (obtained 
with .window on the Dstream) - and it works as expected.

Check this out for code example: 
https://github.com/databricks/reference-apps/blob/master/logs_analyzer/chapter1/scala/src/main/scala/com/databricks/apps/logs/chapter1/LogAnalyzerStreamingSQL.scala

-adrian

From: Daniel Haviv
Date: Monday, October 12, 2015 at 12:52 PM
To: user
Subject: SQLContext within foreachRDD

Hi,
As things that run inside foreachRDD run at the driver, does that mean that if 
we use SQLContext inside foreachRDD the data is sent back to the driver and 
only then the query is executed or is it executed at the executors?


Thank you.
Daniel




Re: "dynamically" sort a large collection?

2015-10-12 Thread Adrian Tanase
I think you’re looking for the flatMap (or flatMapValues) operator – you can do 
something like

sortedRdd.flatMapValues( v =>
If (v % 2 == 0) {
Some(v / 2)
} else {
None
}
)

Then you need to sort again.

-adrian

From: Yifan LI
Date: Monday, October 12, 2015 at 1:03 PM
To: spark users
Subject: "dynamically" sort a large collection?

Hey,

I need to scan a large "key-value" collection as below:

1) sort it on an attribute of “value”
2) scan it one by one, from element with largest value
2.1) if the current element matches a pre-defined condition, its value will be 
reduced and the element will be inserted back to collection.
if not, this current element should be removed from collection.


In my previous program, the 1) step can be easily conducted in Spark(RDD 
operation), but I am not sure how to do 2.1) step, esp. the “put/inserted back” 
operation on a sorted RDD.
I have tried to make a new RDD at every-time an element was found to inserted, 
but it is very costly due to a re-sorting…


Is there anyone having some ideas?

Thanks so much!

**
an example:

the sorted result of initial collection C(on bold value), sortedC:
(1, (71, “aaa"))
(2, (60, “bbb"))
(3, (53.5, “ccc”))
(4, (48, “ddd”))
(5, (29, “eee"))
…

pre-condition: its_value%2 == 0
if pre-condition is matched, its value will be reduce on half.

Thus:

#1:
71 is not matched, so this element is removed.
(1, (71, “aaa”)) —> removed!
(2, (60, “bbb"))
(3, (53.5, “ccc”))
(4, (48, “ddd”))
(5, (29, “eee"))
…

#2:
60 is matched! 60/2 = 30, the collection right now should be as:
(3, (53.5, “ccc”))
(4, (48, “ddd”))
(2, (30, “bbb”)) <— inserted back here
(5, (29, “eee"))
…






Best,
Yifan LI







Re: Spark retrying task indefinietly

2015-10-12 Thread Adrian Tanase
To answer your question specifically - you can bump the value on 
spark.streaming.kafka.maxRetries (see configuration guide: 
http://spark.apache.org/docs/latest/configuration.html).

That being said, you should avoid this by adding some validation in your 
deserializaiton / parse code.

A quick and dirty way to do it is:

val lines = messages.flatMapValues(v => Try(v.toInt).toOption)


This way, only the lines that are successfully parsed are kept around.
Read a bit on scala.util.{Try, Success, Failure} and Options to understand 
what’s going on.

-adrian



On 10/12/15, 9:05 AM, "Amit Singh Hora"  wrote:

>I am running spark locally to understand how countByValueAndWindow works
>
>
>  
>  val Array(brokers, topics) = Array("192.XX.X.XX:9092", "test1")
>  
>  // Create context with 2 second batch interval
>  val sparkConf = new
>SparkConf().setAppName("ReduceByWindowExample").setMaster("local[1,1]")
>  sparkConf.set("spark.task.maxFailures","1")
>  
>  val ssc = new StreamingContext(sparkConf, Seconds(1)) // batch size 1
>  ssc.checkpoint("D:\\SparkCheckPointDirectory")
>  
>  // Create direct kafka stream with brokers and topics
>  val topicsSet = topics.split(",").toSet
>  val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokers)
>  
>  val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
>ssc, kafkaParams, topicsSet)
>  
>  // Get the lines, split them into words, count the words and print
>  val lines = messages.map(_._2.toInt)
>  val keyValuelines = lines.map { x => (x, 1) }
>  
>  val windowedlines=lines.countByValueAndWindow(Seconds(1),Seconds(1))
>  //window,interval
>  //val windowedlines = lines.reduceByWindow((x, y) => { x + y },
>Seconds(4) , Seconds(2))
>  windowedlines.print()
>  
>  ssc.start()
>  ssc.awaitTermination()
>
>
>everything works file till numeric data is supplied on the kafka topic as I
>am using toInt ,when a blank string "" is written on kafka topic it fails
>complaining NumberFormatExceotion that is OK,but the problem is it is
>retrying this indefinetly again and again and complaining the same
>NumberFormatException Is there any way to control number of time spark will
>try to convert string to Int ,like Spark should try it only [times] and then
>move to next batch of data
>
>Note - I am using Spark 1.4
>
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Spark-retrying-task-indefinietly-tp25022.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Streaming Performance w/ UpdateStateByKey

2015-10-10 Thread Adrian Tanase
How are you determining how much time is serialization taking?

I made this change in a streaming app that relies heavily on updateStateByKey. 
The memory consumption went up 3x on the executors but I can't see any perf 
improvement. Task execution time is the same and the serialization state metric 
in the spark UI is 0-1ms in both scenarios.

Any idea where else to look or why am I not seeing any performance uplift?

Thanks!
Adrian

Sent from my iPhone

On 06 Oct 2015, at 00:47, Tathagata Das 
> wrote:

You could call DStream.persist(StorageLevel.MEMORY_ONLY) on the stateDStream 
returned by updateStateByKey to achieve the same. As you have seen, the 
downside is greater memory usage, and also higher GC overheads (that;s the main 
one usually). So I suggest you run your benchmarks for a long enough time to 
see what is the GC overheads. If it turns out that some batches are randomly 
taking longer because of some task in some executor being stuck in GC, then its 
going to be bad.

Alternatively, you could also starting playing with CMS GC, etc.

BTW, it would be amazing, if you can share the number in your benchmarks. 
Number of states, how complex are the objects in state, whats the processing 
time and whats the improvements.

TD


On Mon, Oct 5, 2015 at 2:28 PM, Jeff Nadler 
> wrote:

While investigating performance challenges in a Streaming application using 
UpdateStateByKey, I found that serialization of state was a meaningful (not 
dominant) portion of our execution time.

In StateDStream.scala, serialized persistence is required:

 super.persist(StorageLevel.MEMORY_ONLY_SER)

I can see why that might be a good choice for a default.For our workload, I 
made a clone that uses StorageLevel.MEMORY_ONLY.   I've just completed some 
tests and it is indeed faster, with the expected cost of greater memory usage.  
 For us that would be a good tradeoff.

I'm not taking any particular extra risks by doing this, am I?

Should this be configurable?  Perhaps yet another signature for 
PairDStreamFunctions.updateStateByKey?

Thanks for sharing any thoughts-

Jef






Re: Notification on Spark Streaming job failure

2015-10-07 Thread Adrian Tanase
We’re deploying using YARN in cluster mode, to take advantage of automatic 
restart of long running streaming app. We’ve also done a POC on top of 
Mesos+Marathon, that’s always an option.

For monitoring / alerting, we’re using a combination of:

  *   Spark REST API queried from OpsView via nagios style checks
 *   Here, we have thresholds on things like number of successful jobs / 
tasks, total execution time, etc
  *   Custom business/operational metrics logged manually from the streaming 
app to OpenTSDB
 *   we’re using a combination of spark accumulators and custom RDDs – 
after summarizing some counters we’re pushing them to OpenTSDB via the REST API
 *   we’re using dashboards built with Grafana that poll OpenTSDB – nicer 
looking, same functionality
 *   We have a custom opsview check that queries OpenTSDB and looks for 
some successful number of events processed by the job over a period of time
 *   This is coupled with  a stable stream of data from a canary instance

Hope this helps – feel free to google around for all the above buzzwords :). I 
can get into more details on demand.

-adrian

From: Chen Song
Date: Monday, September 28, 2015 at 5:00 PM
To: Krzysztof Zarzycki
Cc: user
Subject: Re: Notification on Spark Streaming job failure

I am also interested specifically in monitoring and alerting on Spark streaming 
jobs. It will be helpful to get some general guidelines or advice on this, from 
people who implemented anything on this.

On Fri, Sep 18, 2015 at 2:35 AM, Krzysztof Zarzycki 
> wrote:
Hi there Spark Community,
I would like to ask you for an advice: I'm running Spark Streaming jobs in 
production. Sometimes these jobs fail and I would like to get email 
notification about it. Do you know how I can set up Spark to notify me by email 
if my job fails? Or do I have to use external monitoring tool?
I'm thinking of the following options:
1. As I'm running those jobs on YARN, monitor somehow YARN jobs. Looked for it 
as well but couldn't find any YARN feature to do it.
2. Run Spark Streaming job in some scheduler, like Oozie, Azkaban, Luigi. Those 
are created rather for batch jobs, not streaming, but could work. Has anyone 
tried that?
3. Run job driver under "monit" tool and catch the failure and send an email 
about it. Currently I'm deploying with yarn-cluster mode and I would need to 
resign from it to run under monit
4. Implement monitoring tool (like Graphite, Ganglia, Prometheus) and use Spark 
metrics. And then implement alerting in those. Can I get information of failed 
jobs in Spark metrics?
5. As 4. but implement my own custom job metrics and monitor them.

What's your opinion about my options? How do you people solve this problem? 
Anything Spark specific?
I'll be grateful for any advice in this subject.
Thanks!
Krzysiek




--
Chen Song



Re: Secondary Sorting in Spark

2015-10-05 Thread Adrian Tanase
Great article, especially the use of a custom partitioner.

Also, sorting by multiple fields by creating a tuple out of them is an awesome, 
easy to miss, Scala feature.

Sent from my iPhone

On 04 Oct 2015, at 21:41, Bill Bejeck 
> wrote:

I've written blog post on secondary sorting in Spark and I'd thought I'd share 
it with the group

http://codingjunkie.net/spark-secondary-sort/

Thanks,
Bill


Re: Broadcast var is null

2015-10-05 Thread Adrian Tanase
FYI the same happens with accumulators when recovering from checkpoint. I'd 
love to see this fixed somehow as the workaround (using a singleton factory in 
foreachRdd to make sure the accumulators are initialized instead of null) is 
really intrusive...

Sent from my iPhone

On 05 Oct 2015, at 22:52, Tathagata Das 
> wrote:

Make sure the broadcast variable works independent of the streaming 
application. Then make sure it work without have 
StreamingContext.getOrCreate(). That will disambiguate whether that error is 
thrown when starting a new context, or when recovering a context from 
checkpoint (as getOrCreate is supposed to do).

On Mon, Oct 5, 2015 at 9:23 AM, dpristin 
> wrote:
Hi,

Can anyone point me out to what I'm doing wrong? I've implemented a very
basic spark streaming app that uses a single broadcast variable. When it
runs locally it produces a proper output (the array I broadcast). But when
deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
is the code:

--- imports go here

object BroadcastTest extends App {
  val logger = LoggerFactory.getLogger("OinkSparkMain")
  logger.info("OinkSparkMain - Setup Logger")

// This is our custom context setup code; nothing fancy goes on here
  val config = Configuration(args)
  val ssc: StreamingContext =
StreamingContext.getOrCreate(config.checkpointDirectory, () => {
SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})


  val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
  val messages = kafkaStreamFactory.Create

  // Grab the value data above kafka input dstream as a string
  val events = messages.map( s => s._2 )

  //Create a broadcast variable - straight from the dev guide
  val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))

  //Try to print out the value of the broadcast var here
  val transformed = events.transform(rdd => {
rdd.map(x => {
  if(broadcastVar == null) {
println("broadcastVar is null")
  }  else {
println("broadcastVar value: " + broadcastVar.value.mkString("|"))
  }
  x
})
  })

  transformed.foreachRDD(x => logger.info("Data: " +
x.collect.mkString("|")))

  ssc.start()
  ssc.awaitTermination()
}

Any input is very much appreciated!

Regards,
Dmitry.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Usage of transform for code reuse between Streaming and Batch job affects the performance ?

2015-10-05 Thread Adrian Tanase
It shouldn't, as lots of the streaming operations delegate to transform under 
the hood. Easiest way to make sure is to look at the source code - with a 
decent IDE navigating around should be a breeze.

As a matter of fact, for more advanced operations where you may want to control 
the partitioning (e.g. unioning 2 DStreams or a simple flatMap) you will be 
forced to use transform as the DStreams hide away some of the control.

-adrian

Sent from my iPhone

> On 05 Oct 2015, at 03:59, swetha  wrote:
> 
> Hi,
> 
> I have the following code for code reuse between the batch and the streaming
> job
> 
> *  val groupedAndSortedSessions =
> sessions.transform(rdd=>JobCommon.getGroupedAndSortedSessions(rdd))*
> 
> The same code without code reuse between the batch and the streaming has the
> following. 
> 
> * val groupedSessions = sessions.groupByKey();
> 
>val sortedSessions  = groupedSessions.mapValues[(List[(Long,
> String)])](iter => iter.toList.sortBy(_._1))
> *
> 
> Does use of transform for code reuse affect groupByKey performance?
> 
> 
> Thanks,
> Swetha
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Usage-of-transform-for-code-reuse-between-Streaming-and-Batch-job-affects-the-performance-tp24920.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD of ImmutableList

2015-10-05 Thread Adrian Tanase
If you don't need to write data back using that library I'd say go for #2. 
Convert to a scala class and standard lists, should be easier down the line. 
That being said, you may end up writing custom code if you stick with kryo 
anyway...

Sent from my iPhone

On 05 Oct 2015, at 21:42, Jakub Dubovsky 
> wrote:

Thank you for quick reaction.

I have to say this is very surprising to me. I never received an advice to stop 
using an immutable approach. Whole RDD is designed to be immutable (which is 
sort of sabotaged by not being able to (de)serialize immutable classes 
properly). I will ask on dev list if this is to be changed or not.

Ok, I have let go initial feelings and now let's be pragmatic. And this is 
still for everyone not just Igor:

I use a class from a library which is immutable. Now I want to use this class 
to represent my data in RDD because this saves me a huge amount of work. The 
class uses ImmutableList as one of its fields. That's why it fails. But isn't 
there a way to workaround this? I ask this because I have exactly zero 
knowledge about kryo and the way how it works. So for example would some of 
these two work?

1) Change the external class so that it implements writeObject, readObject 
methods (it's java). Will these methods be used by kryo? (I can ask the 
maintainers of a library to change the class if the change is reasonable. 
Adding these methods would be while dropping immutability certainly wouldn't)

2) Wrap the class to scala class which would translate the data during 
(de)serialization?

  Thanks!
  Jakub Dubovsky


-- P?vodn? zpr?va --
Od: Igor Berman >
Komu: Jakub Dubovsky 
>
Datum: 5. 10. 2015 20:11:35
P?edm?t: Re: RDD of ImmutableList

kryo doesn't support guava's collections by default
I remember encountered project in github that fixes this(not sure though). I've 
ended to stop using guava collections as soon as spark rdds are concerned.

On 5 October 2015 at 21:04, Jakub Dubovsky 
> wrote:
Hi all,

  I would like to have an advice on how to use ImmutableList with RDD. Small 
presentation of an essence of my problem in spark-shell with guava jar added:

scala> import com.google.common.collect.ImmutableList
import com.google.common.collect.ImmutableList

scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4), 
ImmutableList.of(3,6))
arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2], [2, 
4], [3, 6])

scala> val rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]] = 
ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.count

 This results in kryo exception saying that it cannot add a new element to list 
instance while deserialization:

java.io.IOException: java.lang.UnsupportedOperationException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at 
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
...
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
at 
com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:91)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
...

  It somehow makes sense. But I cannot think of a workaround and I do not 
believe that using ImmutableList with RDD is not possible. How this is solved?

  Thank you in advance!

   Jakub Dubovsky




Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Adrian Tanase
You are absolutely correct, I apologize.

My understanding was that you are sharing the machine across many jobs. That 
was the context in which I was making that comment.

-adrian

Sent from my iPhone

On 03 Oct 2015, at 07:03, Philip Weaver 
<philip.wea...@gmail.com<mailto:philip.wea...@gmail.com>> wrote:

You can't really say 8 cores is not much horsepower when you have no idea what 
my use case is. That's silly.

On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
Forgot to mention that you could also restrict the parallelism to 4, 
essentially using only 4 cores at any given time, however if your job is 
complex, a stage might be broken into more than 1 task...

Sent from my iPhone

On 19 Sep 2015, at 08:30, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:

Reading through the docs it seems that with a combination of FAIR scheduler and 
maybe pools you can get pretty far.

However the smallest unit of scheduled work is the task so probably you need to 
think about the parallelism of each transformation.

I'm guessing that by increasing the level of parallelism you get many smaller 
tasks that the scheduler can then run across the many jobs you might have - as 
opposed to fewer, longer tasks...

Lastly, 8 cores is not that much horsepower :)
You may consider running with beefier machines or a larger cluster, to get at 
least tens of cores.

Hope this helps,
-adrian

Sent from my iPhone

On 18 Sep 2015, at 18:37, Philip Weaver 
<philip.wea...@gmail.com<mailto:philip.wea...@gmail.com>> wrote:

Here's a specific example of what I want to do. My Spark application is running 
with total-executor-cores=8. A request comes in, it spawns a thread to handle 
that request, and starts a job. That job should use only 4 cores, not all 8 of 
the cores available to the cluster.. When the first job is scheduled, it should 
take only 4 cores, not all 8 of the cores that are available to the driver.

Is there any way to accomplish this? This is on mesos.

In order to support the use cases described in 
https://spark.apache.org/docs/latest/job-scheduling.html, where a spark 
application runs for a long time and handles requests from multiple users, I 
believe what I'm asking about is a very important feature. One of the goals is 
to get lower latency for each request, but if the first request takes all 
resources and we can't guarantee any free resources for the second request, 
then that defeats the purpose. Does that make sense?

Thanks in advance for any advice you can provide!

- Philip

On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
<philip.wea...@gmail.com<mailto:philip.wea...@gmail.com>> wrote:
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR 
scheduler, so I can define a long-running application capable of executing 
multiple simultaneous spark jobs.

The kind of jobs that I'm running do not benefit from more than 4 cores, but I 
want my application to be able to take several times that in order to run 
multiple jobs at the same time.

I suppose my question is more basic: How can I limit the number of cores used 
to load an RDD or DataFrame? I can immediately repartition or coalesce my RDD 
or DataFrame to 4 partitions after I load it, but that doesn't stop Spark from 
using more cores to load it.

Does it make sense what I am trying to accomplish, and is there any way to do 
it?

- Philip





Re: how to broadcast huge lookup table?

2015-10-04 Thread Adrian Tanase
have a look at .transformWith, you can specify another RDD.

Sent from my iPhone

On 02 Oct 2015, at 21:50, 
"saif.a.ell...@wellsfargo.com" 
> wrote:

I tried broadcasting a key-value rdd, but then I cannot perform any rdd-actions 
inside a map/foreach function of another rdd.

any tips? If going into scala collections I end up with huge memory bottlenecks.

Saif



Re: Shuffle Write v/s Shuffle Read

2015-10-02 Thread Adrian Tanase
I’m not sure this is related to memory management – the shuffle is the central 
act of moving data around nodes when the computations need the data on another 
node (E.g. Group by, sort, etc)

Shuffle read and shuffle write should be mirrored on the left/right side of a 
shuffle between 2 stages.

-adrian

From: Kartik Mathur
Date: Thursday, October 1, 2015 at 10:36 PM
To: user
Subject: Shuffle Write v/s Shuffle Read

Hi

I am trying to better understand shuffle in spark .

Based on my understanding thus far ,

Shuffle Write : writes stage output for intermediate stage on local disk if 
memory is not sufficient.,
Example , if each worker has 200 MB memory for intermediate results and the 
results are 300MB then , each executer will keep 200 MB in memory and will 
write remaining 100 MB on local disk .

Shuffle Read : Each executer will read from other executer's memory + disk , so 
total read in above case will be 300(200 from memory and 100 from disk)*num of 
executers ?

Is my understanding correct ?

Thanks,
Kartik



Re: Accumulator of rows?

2015-10-02 Thread Adrian Tanase
Have you seen window functions?
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

From: "saif.a.ell...@wellsfargo.com"
Date: Thursday, October 1, 2015 at 9:44 PM
To: "user@spark.apache.org"
Subject: Accumulator of rows?

Hi all,

I need to repeat a couple rows from a dataframe by n times each. To do so, I 
plan to create a new Data Frame, but I am being unable to find a way to 
accumulate “Rows” somewhere, as this might get huge, I can’t accumulate into a 
mutable Array, I think?.

Thanks,
Saif



Re: Kafka Direct Stream

2015-10-01 Thread Adrian Tanase
On top of that you could make the topic part of the key (e.g. keyBy in 
.transform or manually emitting a tuple) and use one of the .xxxByKey operators 
for the processing.

If you have a stable, domain specific list of topics (e.g. 3-5 named topics) 
and the processing is really different, I would also look at filtering by topic 
and saving as different Dstreams in your code.

Either way you need to start with Cody’s tip in order to extract the topic name.

-adrian

From: Cody Koeninger
Date: Thursday, October 1, 2015 at 5:06 PM
To: Udit Mehta
Cc: user
Subject: Re: Kafka Direct Stream

You can get the topic for a given partition from the offset range.  You can 
either filter using that; or just have a single rdd and match on topic when 
doing mapPartitions or foreachPartition (which I think is a better idea)

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta 
> wrote:
Hi,

I am using spark direct stream to consume from multiple topics in Kafka. I am 
able to consume fine but I am stuck at how to separate the data for each topic 
since I need to process data differently depending on the topic.
I basically want to split the RDD consisting on N topics into N RDD's each 
having 1 topic.

Any help would be appreciated.

Thanks in advance,
Udit



Re: Lost leader exception in Kafka Direct for Streaming

2015-10-01 Thread Adrian Tanase
This also happened to me in extreme recovery scenarios – e.g. Killing 4 out of 
a  7 machine cluster.

I’d put my money on recovering from an out of sync replica, although I haven’t 
done extensive testing around it.

-adrian

From: Cody Koeninger
Date: Thursday, October 1, 2015 at 5:18 PM
To: swetha
Cc: "user@spark.apache.org"
Subject: Re: Lost leader exception in Kafka Direct for Streaming

Did you check you kafka broker logs to see what was going on during that time?

The direct stream will handle normal leader loss / rebalance by retrying tasks.

But the exception you got indicates that something with kafka was wrong, such 
that offsets were being re-used.

ie. your job already processed up through beginning offset 15027734702

but when asking kafka for the highest available offsets, it returns ending 
offset 15027725493

which is lower, in other words kafka lost messages.  This might happen because 
you lost a leader and recovered from a replica that wasn't in sync, or someone 
manually screwed up a topic, or ... ?

If you really want to just blindly "recover" from this situation (even though 
something is probably wrong with your data), the most straightforward thing to 
do is monitor and restart your job.




On Wed, Sep 30, 2015 at 4:31 PM, swetha 
> wrote:

Hi,

I see this sometimes in our Kafka Direct approach in our Streaming job. How
do we make sure that the job recovers from such errors and works normally
thereafter?

15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream partition
19,  sleeping for 200ms
15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream partition
5,  sleeping for 200ms

Followed by every task failing with something like this:

15/09/30 05:26:20 ERROR Executor: Exception in task 4.0 in stage 84281.0
(TID 818804)
kafka.common.NotLeaderForPartitionException

And:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 15
in stage 84958.0 failed 4 times, most recent failure: Lost task 15.3 in
stage 84958.0 (TID 819461, 10.227.68.102): java.lang.AssertionError:
assertion failed: Beginning offset 15027734702 is after the ending offset
15027725493 for topic hubble_stream partition 12. You either provided an
invalid fromOffset, or the Kafka topic has been damaged


Thanks,
Swetha




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Lost-leader-exception-in-Kafka-Direct-for-Streaming-tp24891.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: automatic start of streaming job on failure on YARN

2015-10-01 Thread Adrian Tanase
This happens automatically as long as you submit with cluster mode instead of 
client mode. (e.g. ./spark-submit —master yarn-cluster …)

The property you mention would help right after that, although you will need to 
set it to a large value (e.g. 1000?) - as there is no “infinite” support.

-adrian

From: Jeetendra Gangele
Date: Thursday, October 1, 2015 at 4:30 PM
To: user
Subject: automatic start of streaming job on failure on YARN


We've a streaming application running on yarn and we would like to ensure that 
is up running 24/7.

Is there a way to tell yarn to automatically restart a specific application on 
failure?

There is property yarn.resourcemanager.am.max-attempts which is default set to 
2 setting it to bigger value is the solution? Also I did observed this does not 
seems to work my application is failing and not starting automatically.

Mesos has this build in support wondering why yarn is lacking here?



Regards

jeetendra


Re: Monitoring tools for spark streaming

2015-09-29 Thread Adrian Tanase
You can also use the REST api introduced in 1.4 – although it’s harder to parse:

  *   jobs from the same batch are not grouped together
  *   You only get total delay, not scheduling delay

From: Hari Shreedharan
Date: Tuesday, September 29, 2015 at 5:27 AM
To: Shixiong Zhu
Cc: Siva, "user@spark.apache.org"
Subject: Re: Monitoring tools for spark streaming

+1. The Streaming UI should give you more than enough information.

Thanks,
Hari



On Mon, Sep 28, 2015 at 9:55 PM, Shixiong Zhu 
> wrote:

Which version are you using? Could you take a look at the new Streaming UI in 
1.4.0?


Best Regards,

Shixiong Zhu

2015-09-29 7:52 GMT+08:00 Siva 
>:
Hi,

Could someone recommend the monitoring tools for spark streaming?

By extending StreamingListener we can dump the delay in processing of batches 
and some alert messages.

But are there any Web UI tools where we can monitor failures, see delays in 
processing, error messages and setup alerts etc.

Thanks





Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Adrian Tanase
The error message is very explicit (partition is under replicated), I don’t 
think it’s related to networking issues.

Try to run /home/kafka/bin/kafka-topics.sh —zookeeper localhost/kafka —describe 
topic_name and see which brokers are missing from the replica assignment.
(replace home, zk-quorum etc with your own set-up)

Lastly, has this ever worked? Maybe you’ve accidentally created the topic with 
more partitions and replicas than available brokers… try to recreate with fewer 
partitions/replicas, see if it works.

-adrian

From: Dmitry Goldenberg
Date: Tuesday, September 29, 2015 at 3:37 PM
To: Adrian Tanase
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Kafka error "partitions don't have a leader" / 
LeaderNotAvailableException

Adrian,

Thanks for your response. I just looked at both machines we're testing on and 
on both the Kafka server process looks OK. Anything specific I can check 
otherwise?

From googling around, I see some posts where folks suggest to check the DNS 
settings (those appear fine) and to set the 
advertised.host.name<http://advertised.host.name> in Kafka's server.properties. 
Yay/nay?

Thanks again.

On Tue, Sep 29, 2015 at 8:31 AM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
I believe some of the brokers in your cluster died and there are a number of 
partitions that nobody is currently managing.

-adrian

From: Dmitry Goldenberg
Date: Tuesday, September 29, 2015 at 3:26 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Kafka error "partitions don't have a leader" / 
LeaderNotAvailableException

I apologize for posting this Kafka related issue into the Spark list. Have 
gotten no responses on the Kafka list and was hoping someone on this list could 
shed some light on the below.

---

We're running into this issue in a clustered environment where we're trying to 
send messages to Kafka and are getting the below error.

Can someone explain what might be causing it and what the error message means 
(Failed to send data since partitions [,8] don't have a leader) ?

---

WARN kafka.producer.BrokerPartitionInfo: Error while fetching metadata 
partition 10 leader: none replicas: isr: isUnderReplicated: false for topic 
partition [,10]: [class kafka.common.LeaderNotAvailableException]

ERROR kafka.producer.async.DefaultEventHandler: Failed to send requests for 
topics  with correlation ids in [2398792,2398801]

ERROR com.acme.core.messaging.kafka.KafkaMessageProducer: Error while sending a 
message to the message store. kafka.common.FailedToSendMessageException: Failed 
to send messages after 3 tries.
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) 
~[kafka_2.10-0.8.2.0.jar:?]
at kafka.producer.Producer.send(Producer.scala:77) ~[kafka_2.10-0.8.2.0.jar:?]
at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
~[kafka_2.10-0.8.2.0.jar:?]

WARN kafka.producer.async.DefaultEventHandler: Failed to send data since 
partitions [,8] don't have a leader

What do these errors and warnings mean and how do we get around them?

---

The code for sending messages is basically as follows:

public class KafkaMessageProducer {
private Producer<String, String> producer;

.

public void sendMessage(String topic, String key, String message) throws 
IOException, MessagingException {
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, 
key, message);
try {
  producer.send(data);
} catch (Exception ex) {
  throw new MessagingException("Error while sending a message to the 
message store.", ex);
}
}

Is it possible that the producer gets "stale" and needs to be re-initialized?  
Do we want to re-create the producer on every message (??) or is it OK to hold 
on to one indefinitely?

---

The following are the producer properties that are being set into the producer

batch.num.messages => 200
client.id<http://client.id/> => Acme
compression.codec => none
key.serializer.class => kafka.serializer.StringEncoder
message.send.max.retries => 3
metadata.broker.list => 
data2.acme.com:9092<http://data2.acme.com:9092/>,data3.acme.com:9092<http://data3.acme.com:9092/>
partitioner.class => kafka.producer.DefaultPartitioner
producer.type => sync
queue.buffering.max.messages => 1
queue.buffering.max.ms<http://queue.buffering.max.ms/> => 5000
queue.enqueue.timeout.ms<http://queue.enqueue.timeout.ms/> => -1
request.required.acks => 1
request.tim

Re: Merging two avro RDD/DataFrames

2015-09-29 Thread Adrian Tanase
Seems to me that the obvious candidate is loading both master and delta, using 
join or cogroup then write the new master.

Through some clever sharding and key management you might achieve some 
efficiency gains, but I’d say start here if your numbers are in the hundreds of 
thousands… should run under a minute with the correct resources…

-adrian

From: TEST ONE
Date: Tuesday, September 29, 2015 at 3:00 AM
To: "user@spark.apache.org"
Subject: Merging two avro RDD/DataFrames


I have a daily update of modified users (~100s) output as avro from ETL. I’d 
need to find and merge with existing corresponding members in a master avro 
file (~100,000s) The merge operation involves merging a ‘profiles’ 
Map between the matching records.


What would be the recommended pattern to handle record merging with Spark?


Thanks,

kc


Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Adrian Tanase
I believe some of the brokers in your cluster died and there are a number of 
partitions that nobody is currently managing.

-adrian

From: Dmitry Goldenberg
Date: Tuesday, September 29, 2015 at 3:26 PM
To: "user@spark.apache.org"
Subject: Kafka error "partitions don't have a leader" / 
LeaderNotAvailableException

I apologize for posting this Kafka related issue into the Spark list. Have 
gotten no responses on the Kafka list and was hoping someone on this list could 
shed some light on the below.

---

We're running into this issue in a clustered environment where we're trying to 
send messages to Kafka and are getting the below error.

Can someone explain what might be causing it and what the error message means 
(Failed to send data since partitions [,8] don't have a leader) ?

---

WARN kafka.producer.BrokerPartitionInfo: Error while fetching metadata 
partition 10 leader: none replicas: isr: isUnderReplicated: false for topic 
partition [,10]: [class kafka.common.LeaderNotAvailableException]

ERROR kafka.producer.async.DefaultEventHandler: Failed to send requests for 
topics  with correlation ids in [2398792,2398801]

ERROR com.acme.core.messaging.kafka.KafkaMessageProducer: Error while sending a 
message to the message store. kafka.common.FailedToSendMessageException: Failed 
to send messages after 3 tries.
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) 
~[kafka_2.10-0.8.2.0.jar:?]
at kafka.producer.Producer.send(Producer.scala:77) ~[kafka_2.10-0.8.2.0.jar:?]
at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
~[kafka_2.10-0.8.2.0.jar:?]

WARN kafka.producer.async.DefaultEventHandler: Failed to send data since 
partitions [,8] don't have a leader

What do these errors and warnings mean and how do we get around them?

---

The code for sending messages is basically as follows:

public class KafkaMessageProducer {
private Producer producer;

.

public void sendMessage(String topic, String key, String message) throws 
IOException, MessagingException {
KeyedMessage data = new KeyedMessage(topic, 
key, message);
try {
  producer.send(data);
} catch (Exception ex) {
  throw new MessagingException("Error while sending a message to the 
message store.", ex);
}
}

Is it possible that the producer gets "stale" and needs to be re-initialized?  
Do we want to re-create the producer on every message (??) or is it OK to hold 
on to one indefinitely?

---

The following are the producer properties that are being set into the producer

batch.num.messages => 200
client.id => Acme
compression.codec => none
key.serializer.class => kafka.serializer.StringEncoder
message.send.max.retries => 3
metadata.broker.list => 
data2.acme.com:9092,data3.acme.com:9092
partitioner.class => kafka.producer.DefaultPartitioner
producer.type => sync
queue.buffering.max.messages => 1
queue.buffering.max.ms => 5000
queue.enqueue.timeout.ms => -1
request.required.acks => 1
request.timeout.ms => 1
retry.backoff.ms => 1000
send.buffer.bytes => 102400
serializer.class => kafka.serializer.StringEncoder
topic.metadata.refresh.interval.ms 
=> 60


Thanks.


Re: Converting a DStream to schemaRDD

2015-09-29 Thread Adrian Tanase
Also check this out
https://github.com/databricks/reference-apps/blob/master/logs_analyzer/chapter1/scala/src/main/scala/com/databricks/apps/logs/chapter1/LogAnalyzerStreamingSQL.scala

From the data bricks reference app: https://github.com/databricks/reference-apps

From: Ewan Leith
Date: Tuesday, September 29, 2015 at 5:09 PM
To: Daniel Haviv, user
Subject: RE: Converting a DStream to schemaRDD

Something like:

dstream.foreachRDD { rdd =>
  val df =  sqlContext.read.json(rdd)
  df.select(…)
}

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams


Might be the place to start, it’ll convert each batch of dstream into an RDD 
then let you work it as if it were a standard RDD dataset.

Ewan


From: Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
Sent: 29 September 2015 15:03
To: user >
Subject: Converting a DStream to schemaRDD

Hi,
I have a DStream which is a stream of RDD[String].

How can I pass a DStream to sqlContext.jsonRDD and work with it as a DF ?

Thank you.
Daniel



Re: Spark Streaming Log4j Inside Eclipse

2015-09-29 Thread Adrian Tanase
You should set exta java options for your app via Eclipse project and specify 
something like

 -Dlog4j.configuration=file:/tmp/log4j.properties

Sent from my iPhone

On 28 Sep 2015, at 18:52, Shixiong Zhu 
<zsxw...@gmail.com<mailto:zsxw...@gmail.com>> wrote:

You can use JavaSparkContext.setLogLevel to set the log level in your codes.


Best Regards,

Shixiong Zhu

2015-09-28 22:55 GMT+08:00 Ashish Soni 
<asoni.le...@gmail.com<mailto:asoni.le...@gmail.com>>:
I am not running it using spark submit , i am running locally inside Eclipse 
IDE , how i set this using JAVA Code

Ashish

On Mon, Sep 28, 2015 at 10:42 AM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
You also need to provide it as parameter to spark submit
http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver

From: Ashish Soni
Date: Monday, September 28, 2015 at 5:18 PM
To: user
Subject: Spark Streaming Log4j Inside Eclipse

I need to turn off the verbose logging of Spark Streaming Code when i am 
running inside eclipse i tried creating a log4j.properties file and placed 
inside /src/main/resources but i do not see it getting any effect , Please help 
as not sure what else needs to be done to change the log at DEBUG or WARN




Re: Adding / Removing worker nodes for Spark Streaming

2015-09-29 Thread Adrian Tanase
Just wanted to make sure one thing is really clear – the kafka offsets are part 
of the actual RDD – in every batch spark is saving the offset ranges for each 
partition – this in theory will make the data in each batch stable across 
recovery.

The other important thing is that with correct checkpointing on the DStreams 
(mandatory on stateful ones) you will rarely (if ever!) need to go back from 
zero. That’s the point of checkpointing data.
If you checkpoint every 10 batches, then you will have to re-process AT MOST 10 
batches back, and the new data will be merged into the state that’s loaded from 
the hdfs checkpoint.

Lastly, there are still issues with adding/removing nodes from a running 
cluster. Most of the time it works, sometimes the job crashes or doesn’t 
re-deploy the executors. That being said, restarting the driver (with no 
dataloss thanks to checkpointing) has always been a workaround that worked for 
me.
In this spirit, you could test (I have it on my list) stopping a driver by 
killing the process or with yarn application –kill and resubmitting with a 
larger number of executors (—executor-cores). In theory it should work as 
expected, I don’t think this is part of the checkpointed metadata in the spark 
context.

-adrian


From: Cody Koeninger
Date: Tuesday, September 29, 2015 at 12:49 AM
To: Sourabh Chandak
Cc: Augustus Hong, "user@spark.apache.org"
Subject: Re: Adding / Removing worker nodes for Spark Streaming

If a node fails, the partition / offset range that it was working on will be 
scheduled to run on another node.  This is generally true of spark, regardless 
of checkpointing.

The offset ranges for a given batch are stored in the checkpoint for that 
batch.  That's relevant if your entire job fails (driver failure, all workers 
fail, etc).

If you really can't afford to run from the smallest offset and can't afford to 
lose data, don't rely on spark checkpoints (because of the conditions under 
which they can't be recovered).  Store the offset ranges yourself.


On Mon, Sep 28, 2015 at 4:34 PM, Sourabh Chandak 
> wrote:
I also have the same use case as Augustus, and have some basic questions about 
recovery from checkpoint. I have a 10 node Kafka cluster and a 30 node Spark 
cluster running streaming job, how is the (topic, partition) data handled in 
checkpointing. The scenario I want to understand is, in case of node failure 
how will a new node know the checkpoint of the failed node?
The amount of data we have is huge and we can't run from the smallest offset.

Thanks,
Sourabh

On Mon, Sep 28, 2015 at 11:43 AM, Augustus Hong 
> wrote:
Got it, thank you!


On Mon, Sep 28, 2015 at 11:37 AM, Cody Koeninger 
> wrote:
Losing worker nodes without stopping is definitely possible.  I haven't had 
much success adding workers to a running job, but I also haven't spent much 
time on it.

If you're restarting with the same jar, you should be able to recover from 
checkpoint without losing data (usual caveats apply, e.g. you need enough kafka 
retention).  Make sure to test it though, as the code paths taken during 
recovery from checkpoint are not the same as on initial startup, and you can 
run into unexpected issues (e.g. authentication).

On Mon, Sep 28, 2015 at 1:27 PM, Augustus Hong 
> wrote:
Hey all,

I'm evaluating using Spark Streaming with Kafka direct streaming, and I have a 
couple of questions:

1.  Would it be possible to add / remove worker nodes without stopping and 
restarting the spark streaming driver?

2.  I understand that we can enable checkpointing to recover from node 
failures, and that it doesn't work across code changes.  What about in the 
event that worker nodes failed due to load -> we added more worker nodes -> 
restart Spark Streaming?  Would this incur data loss as well?


Best,
Augustus

--
[Branch Metrics mobile deep linking] 
[https://app.xink.io/Images/Get/G3/b84.jpg]  Augustus Hong
 Data Analytics | Branch Metrics
 m 650-391-3369 | e 
augus...@branch.io




--
[Branch Metrics mobile deep linking] 
[https://app.xink.io/Images/Get/G3/b84.jpg]  Augustus Hong
 Data Analytics | Branch Metrics
 m 650-391-3369 | e 
augus...@branch.io




Re: Does YARN start new executor in place of the failed one?

2015-09-29 Thread Adrian Tanase
In theory, yes  - however in practice it seems that it depends on how they die.

I’ve recently logged an issue for the case when the machine is restarted. If 
the executor process dies it generally comes back gracefully.
https://issues.apache.org/jira/browse/SPARK-10792

Maybe you can vote up the issue if it’s the same use case :)

Also – make sure that you have resources available in YARN, if the cluster is 
shared.

-adrian

From: Alexander Pivovarov
Date: Tuesday, September 29, 2015 at 1:38 AM
To: "user@spark.apache.org"
Subject: Does YARN start new executor in place of the failed one?

Hello Everyone

I use Spark on YARN on EMR-4

The spark program which I run has several jobs/stages and run for about 10 hours
During the execution some executors might fail for some reason.
BUT I do not see that new executor are started in place of the failed ones

So, what I see in spark UI is that at the beginning of my program I have 100 
executors but in 10 hours I see only 67 executors.

I remember that in Standalone mode Spark Worker starts new executor in place of 
failed one automatically.

How to active the same behavior on YARN?

The only non-default YARN setting I use are the following:
yarn.nodemanager.pmem-check-enabled=false
yarn.nodemanager.vmem-check-enabled=false

Thank you
Alex


Re: Spark Streaming Log4j Inside Eclipse

2015-09-28 Thread Adrian Tanase
You also need to provide it as parameter to spark submit
http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver

From: Ashish Soni
Date: Monday, September 28, 2015 at 5:18 PM
To: user
Subject: Spark Streaming Log4j Inside Eclipse

I need to turn off the verbose logging of Spark Streaming Code when i am 
running inside eclipse i tried creating a log4j.properties file and placed 
inside /src/main/resources but i do not see it getting any effect , Please help 
as not sure what else needs to be done to change the log at DEBUG or WARN


Re: Receiver and Parallelization

2015-09-25 Thread Adrian Tanase
Good catch, I was not aware of this setting.

I’m wondering though if it also generates a shuffle or if the data is still 
processed by the node on which it’s ingested - so that you’re not gated by the 
number of cores on one machine.

-adrian



On 9/25/15, 5:27 PM, "Silvio Fiorito"  wrote:

>One thing you should look at is your batch duration and 
>spark.streaming.blockInterval
>
>Those 2 things control how many partitions are generated for each RDD (batch) 
>of the DStream when using a receiver (vs direct approach).
>
>So if you have a 2 second batch duration and the default blockInterval of 
>200ms this will create 10 partitions. This means you can have a max of 10 
>parallel tasks (as long as you have the cores available) running at a time for 
>a map-like operation.
>
>
>
>
>On 9/25/15, 9:08 AM, "nib...@free.fr"  wrote:
>
>>Hello,
>>I used a custom receiver in order to receive JMS messages from MQ Servers.
>>I want to benefit of Yarn cluster, my questions are :
>>
>>- Is it possible to have only one node receiving JMS messages and parralelize 
>>the RDD over all the cluster nodes ?
>>- Is it possible to parallelize also the message receiver over cluster nodes ?
>>
>>If you have any code example for the both items it would be fine, because the 
>>parralelization mechanism in the code is not crystal clear for me ...
>>
>>Tks
>>Nicolas
>>
>>-
>>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>For additional commands, e-mail: user-h...@spark.apache.org
>>


Re: Reasonable performance numbers?

2015-09-25 Thread Adrian Tanase
It’s really hard to answer this, as the comparison is not really fair – Storm 
is much lower level than Spark and has less overhead when dealing with 
stateless operations. I’d be curious how is your colleague implementing the 
Average on a “batch” and what is the storm equivalent of a Batch.

That aside, the storm implementation seems in the ballpark (we were clocking 
~80K events /sec/node in a well tuned storm app).
For a spark app, it really depends on the lineage, checkpointing, level of 
parallelism, partitioning, number of tasks, scheduling overhead, etc – unless 
we confirm that the job operates at full capacity, I can’t say for sure that 
it’s good or bad.

Some questions to isolate the issue:

  *   Can you try with a manual average operation? (e.g.reduce the events to a 
tuple with count and sum, then divide them)
 *   Looking at the DoubleRDD.mean implementation, the stats module 
computes lots of stuff on the side, not just the mean
 *   Not sure if it matters, but I’m assuming on the storm side you’re not 
doing that
  *   Can you confirm that the operation in cause is indeed computed in 
parallel (should have 48 tasks).
 *   If yes, how many records per second do you get on average, only for 
this operation? - you can find this out in the SparkUI, dividing the number of 
records allocated to one of the executors by the total task time for that 
executor
  *   Are the executors well balanced? Is each of them processing 16 partitions 
in approx equal time? If not, you can have multiple issues here:
 *   Kafka cluster imbalance – happens from time to time, you can monitor 
this from the command line with kafka-topics.sh —describe
 *   Kafka key partitioning scheme – assuming round-robin distribution, 
just checking – you should see this in the 1st stage, again all the executors 
should have allocated an equal number of tasks/partitions to process with an 
equal number of messages

Given that you have 7 kafka brokers and 3 spark executors, you should try a 
.repartition(48) on the kafka Dstream – at the one time cost of a shuffle you 
redistribute the data evenly across all nodes/cores and avoid most of the 
issues above – at least you have a guarantee that the load is spread evenly 
across the cluster.

Hope this helps,

-adrian

From: "Young, Matthew T"
Date: Thursday, September 24, 2015 at 11:47 PM
To: "user@spark.apache.org"
Cc: "Krumm, Greg"
Subject: Reasonable performance numbers?

Hello,

I am doing performance testing with Spark Streaming. I want to know if the 
throughput numbers I am encountering are reasonable for the power of my cluster 
and Spark’s performance characteristics.

My job has the following processing steps:

1.  Read 600 Byte JSON strings from a 7 broker / 48 partition Kafka cluster 
via the Kafka Direct API

2.  Parse the JSON with play-json or lift-json (no significant performance 
difference)

3.  Read one integer value out of the JSON

4.  Compute the average of this integer value across all records in the 
batch with DoubleRDD.mean

5.  Write the average for the batch back to a different Kafka topic

I have tried 2, 4, and 10 second batch intervals. The best throughput I can 
sustain is about 75,000 records/second for the whole cluster.

The Spark cluster is in a VM environment with 3 VMs. Each VM has 32 GB of RAM 
and 16 cores. The systems are networked with 10 GB NICs. I started testing with 
Spark 1.3.1 and switched to Spark 1.5 to see if there was improvement (none 
significant). When I look at the event timeline in the WebUI I see that the 
majority of the processing time for each batch is “Executor Computing Time” in 
the foreachRDD that computes the average, not the transform that does the JSON 
parsing.

CPU util hovers around 40% across the cluster, and RAM has plenty of free space 
remaining as well. Network comes nowhere close to being saturated.

My colleague implementing similar functionality in Storm is able to exceed a 
quarter million records per second with the same hardware.

Is 75K records/seconds reasonable for a cluster of this size? What kind of 
performance would you expect for this job?


Thanks,

-- Matthew


Re: kafka direct streaming with checkpointing

2015-09-25 Thread Adrian Tanase
Hi Radu,

The problem itself is not checkpointing the data – if your operations are 
stateless then you are only checkpointing the kafka offsets, you are right.
The problem is that you are also checkpointing metadata – including the actual 
Code and serialized java classes – that’s why you’ll see ser/deser exceptions 
on restart with upgrade.

If you’re not using stateful opetations, you might get away by using the old 
Kafka receiver w/o WAL – but you accept “at least once semantics”. As soon as 
you add in the WAL you are forced to checkpoint and you’re better off with the 
DirectReceiver approach.

I believe the simplest way to get around is to support runnning 2 versions in 
parallel – with some app level control of a barrier (e.g. v1 reads events up to 
3:00am, v2 after that). Manual state management is also supported by the 
framework but it’s harder to control because:

  *   you’re not guaranteed to shut down gracefully
  *   You may have a bug that prevents the state to be saved and you can’t 
restart the app w/o upgrade

Less than ideal, yes :)

-adrian

From: Radu Brumariu
Date: Friday, September 25, 2015 at 1:31 AM
To: Cody Koeninger
Cc: "user@spark.apache.org"
Subject: Re: kafka direct streaming with checkpointing

Would changing the direct stream api to support committing the offsets to 
kafka's ZK( like a regular consumer) as a fallback mechanism, in case 
recovering from checkpoint fails , be an accepted solution?

On Thursday, September 24, 2015, Cody Koeninger 
> wrote:
This has been discussed numerous times, TD's response has consistently been 
that it's unlikely to be possible

On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu 
> wrote:
It seems to me that this scenario that I'm facing, is quite common for spark 
jobs using Kafka.
Is there a ticket to add this sort of semantics to checkpointing ? Does it even 
make sense to add it there ?

Thanks,
Radu


On Thursday, September 24, 2015, Cody Koeninger 
> wrote:
No, you cant use checkpointing across code changes.  Either store offsets 
yourself, or start up your new app code and let it catch up before killing the 
old one.

On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu  wrote:
Hi,
in my application I use Kafka direct streaming and I have also enabled 
checkpointing.
This seems to work fine if the application is restarted. However if I change 
the code and resubmit the application, it cannot start because of the 
checkpointed data being of different class versions.
Is there any way I can use checkpointing that can survive across application 
version changes?

Thanks,
Radu





Re: Using Spark for portfolio manager app

2015-09-25 Thread Adrian Tanase
Just use the official connector from DataStax 
https://github.com/datastax/spark-cassandra-connector

Your solution is very similar. Let’s assume the state is

case class UserState(amount: Int, updates: Seq[Int])

And your user has 100 - If your user does not see an update, you can emit

Some(UserState(100, Seq.empty))

Otherwise maybe you can emit

Some(UserState(130, List(50, -20)))

You can then process the updates like this

usersState.filter(_.updates.length > 0).foreachRdd { ... }

Regarding optimizations, I would not worry too much about it. Going through 
users with no updates is most likely a no-op. Spark HAS to iterate through all 
the state objects since it does not operate with deltas from one batch to the 
next – the StateDStream is really the whole app state packed as a RDD.
You could look at one of the other updateStateByKey methods – maybe you can 
write more efficient code there:

def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
  ): DStream[(K, S)] = …

What you can do though (and here you’ll be glad that spark also executes the 
code for state objects w/o updates) is cleanup users if they haven’t received 
updates for a long time, then load the state from DB the next time you see 
them. I would consider this a must-have optimization to keep some bounds on the 
memory needs.

-adrian

From: Thúy Hằng Lê
Date: Friday, September 25, 2015 at 2:05 PM
To: Adrian Tanase
Subject: Re: Using Spark for portfolio manager app


Hi Adrian,

Thanks Cassandra seems to be good candidate too. I will give it a try.
Do you know any stable connector that help Spark work with Cassandra? Or I 
should write it myself.

Regards my second question, i think i figuring the another solution, i will 
append another flag ( like isNew) to the tupe in updateStateByKey function. 
Then using filter to know which record i should update to database.
But it would be great if you could share your solution too( i don't quite get 
the idea of emitting new tupe).

In addition to this, for Spark design, seems it have to iterate to all key( 
includes one that not change) to do aggregation for each batch. For my use 
cases i have 3M keys, but only 2-3K change for each batch ( every 1 second) is 
there any way to optimize this process?

On Sep 25, 2015 4:12 PM, "Adrian Tanase" 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
Re: DB I strongly encourage you to look at Cassandra – it’s almost as powerful 
as Hbase, a lot easier to setup and manage. Well suited for this type of 
usecase, with a combination of K/V store and time series data.

For the second question, I’ve used this pattern all the time for “flash 
messages” - passing info as a 1 time message downstream:

  *   In your updateStateByKey function, emit a tuple of (actualNewState, 
changedData)
  *   Then filter this on !changedData.isEmpty or something
  *   And only do foreachRdd on the filtered stream.

Makes sense?

-adrian

From: Thúy Hằng Lê
Date: Friday, September 25, 2015 at 10:31 AM
To: ALEX K
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Using Spark for portfolio manager app


Thanks all for the feedback so far.
I havn't decided which external storage will be used yet.
HBase is cool but it requires Hadoop in production. I only have 3-4 servers for 
the whole things ( i am thinking of a relational database for this, can be 
MariaDB, Memsql or mysql) but they are hard to scale.
I will try various appoaches before making any decision.

In addition, using Spark Streaming is there any way to update only new data to 
external storage after using updateStateByKey?
The foreachRDD function seems to loop over all RDDs( includes one that havent 
changed) i believe Spark streamming must has a way to do it, but i still 
couldn't find an example doing similar job.


Re: Using Spark for portfolio manager app

2015-09-25 Thread Adrian Tanase
Re: DB I strongly encourage you to look at Cassandra – it’s almost as powerful 
as Hbase, a lot easier to setup and manage. Well suited for this type of 
usecase, with a combination of K/V store and time series data.

For the second question, I’ve used this pattern all the time for “flash 
messages” - passing info as a 1 time message downstream:

  *   In your updateStateByKey function, emit a tuple of (actualNewState, 
changedData)
  *   Then filter this on !changedData.isEmpty or something
  *   And only do foreachRdd on the filtered stream.

Makes sense?

-adrian

From: Thúy Hằng Lê
Date: Friday, September 25, 2015 at 10:31 AM
To: ALEX K
Cc: "user@spark.apache.org"
Subject: Re: Using Spark for portfolio manager app


Thanks all for the feedback so far.
I havn't decided which external storage will be used yet.
HBase is cool but it requires Hadoop in production. I only have 3-4 servers for 
the whole things ( i am thinking of a relational database for this, can be 
MariaDB, Memsql or mysql) but they are hard to scale.
I will try various appoaches before making any decision.

In addition, using Spark Streaming is there any way to update only new data to 
external storage after using updateStateByKey?
The foreachRDD function seems to loop over all RDDs( includes one that havent 
changed) i believe Spark streamming must has a way to do it, but i still 
couldn't find an example doing similar job.


Re: Receiver and Parallelization

2015-09-25 Thread Adrian Tanase
1) yes, just use .repartition on the inbound stream, this will shuffle data 
across your whole cluster and process in parallel as specified.
2) yes, although I’m not sure how to do it for a totally custom receiver. Does 
this help as a starting point? 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving





On 9/25/15, 4:08 PM, "nib...@free.fr"  wrote:

>Hello,
>I used a custom receiver in order to receive JMS messages from MQ Servers.
>I want to benefit of Yarn cluster, my questions are :
>
>- Is it possible to have only one node receiving JMS messages and parralelize 
>the RDD over all the cluster nodes ?
>- Is it possible to parallelize also the message receiver over cluster nodes ?
>
>If you have any code example for the both items it would be fine, because the 
>parralelization mechanism in the code is not crystal clear for me ...
>
>Tks
>Nicolas
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Custom Hadoop InputSplit, Spark partitions, spark executors/task and Yarn containers

2015-09-24 Thread Adrian Tanase
RE: # because I already have a bunch of InputSplits, do I still need to specify 
the number of executors to get processing parallelized?

I would say it’s best practice to have as many executors as data nodes and as 
many cores as you can get from the cluster – if YARN has enough  resources it 
will deploy the executors distributed across the cluster, then each of them 
will try to process the data locally (check the spark ui for NODE_LOCAL), with 
as many splits in parallel as you defined in spark.executor.cores

-adrian

From: Sandy Ryza
Date: Thursday, September 24, 2015 at 2:43 AM
To: Anfernee Xu
Cc: "user@spark.apache.org"
Subject: Re: Custom Hadoop InputSplit, Spark partitions, spark executors/task 
and Yarn containers

Hi Anfernee,

That's correct that each InputSplit will map to exactly a Spark partition.

On YARN, each Spark executor maps to a single YARN container.  Each executor 
can run multiple tasks over its lifetime, both parallel and sequentially.

If you enable dynamic allocation, after the stage including the InputSplits 
gets submitted, Spark will try to request an appropriate number of executors.

The memory in the YARN resource requests is --executor-memory + what's set for 
spark.yarn.executor.memoryOverhead, which defaults to 10% of --executor-memory.

-Sandy

On Wed, Sep 23, 2015 at 2:38 PM, Anfernee Xu 
> wrote:
Hi Spark experts,

I'm coming across these terminologies and having some confusions, could you 
please help me understand them better?

For instance I have implemented a Hadoop InputFormat to load my external data 
in Spark, in turn my custom InputFormat will create a bunch of InputSplit's, my 
questions is about

# Each InputSplit will exactly map to a Spark partition, is that correct?

# If I run on Yarn, how does Spark executor/task map to Yarn container?

# because I already have a bunch of InputSplits, do I still need to specify the 
number of executors to get processing parallelized?

# How does -executor-memory map to the memory requirement in Yarn's resource 
request?

--
--Anfernee



Re: Spark on YARN / aws - executor lost on node restart

2015-09-24 Thread Adrian Tanase
Closing the loop, I’ve submitted this issue – TD, cc-ing you since it’s spark 
streaming, not sure who oversees the Yarn module.
https://issues.apache.org/jira/browse/SPARK-10792

-adrian

From: Adrian Tanase
Date: Friday, September 18, 2015 at 6:18 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Spark on YARN / aws - executor lost on node restart

Hi guys,

Digging up this question after spending some more time trying to replicate it.
It seems to be an issue with the YARN – spark integration, wondering if there 
is a bug already tracking this?

If I just kill the process on the machine, YARN detects the container is dead 
and the spark framework requests a new container to be deployed.
If the machine goes away completely, spark sees that the executor is lost but 
YarnAllocator never tries to request the container again. Wondering if there’s 
an implicit assumption that it would be notified by YARN, which might not 
happen if the node dies completely?

If there are no ideas on the list, I’ll prepare some logs and follow up with an 
issue.

Thanks,
-adrian

From: Adrian Tanase
Date: Wednesday, September 16, 2015 at 6:01 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Spark on YARN / aws - executor lost on node restart

Hi all,

We’re using spark streaming (1.4.0), deployed on AWS through yarn. It’s a 
stateful app that reads from kafka (with the new direct API) and we’re 
checkpointing to HDFS.

During some resilience testing, we restarted one of the machines and brought it 
back online. During the offline period, the Yarn cluster would not have 
resources to re-create the missing executor.
After starting all the services on the machine, it correctly joined the Yarn 
cluster, however the spark streaming app does not seem to notice that the 
resources are back and has not re-created the missing executor.

The app is correctly running with 6 out o 7 executors, however it’s running 
under capacity.
If we manually kill the driver and re-submit the app to yarn, all the sate is 
correctly recreated from checkpoint and all 7 executors are now online – 
however this seems like a brutal workaround.

So, here are some questions:

  *   Isn't the driver supposed to auto-heal after a machine is completely lost 
and then comes back after some time?
  *   Are any configuration settings that influence how spark driver should 
poll yarn to check back on resources being available again?
  *   Is there a tool one can run to “force” the driver to re-create missing 
workers/executors?

Lastly, another issue was that the driver also crashed and yarn successfully 
restarted it – I’m not sure yet if it’s because of some retry setting or 
another exception, will post the logs after I recreate the problem.

Thanks in advance for any ideas,
-adrian


Re: How to make Group By/reduceByKey more efficient?

2015-09-24 Thread Adrian Tanase
All the *ByKey aggregations perform an efficient shuffle and preserve 
partitioning on the output. If all you need is to call reduceByKey, then don’t 
bother with groupBy. You should use groupBy if you really need all the 
datapoints from a key for a very custom operation.


From the docs:

Note: If you are grouping in order to perform an aggregation (such as a sum or 
average) over each key, using reduceByKey or aggregateByKey will yield much 
better performance. 


What you should worry about in more complex pipelines is that you’re actually 
preserving the partitioner between stages. For example, if you use a custom 
partitioner between a partitionBy and an updateStateBy key. Or if you use .map 
or .flatMap instead of .mapValues and .flatMapValues.

By the way, learn to use the Spark UI to understand the DAG / Execution plan 
and try to navigate the source code - I found the comments and the various 
preservePartitioner options very educational.

-adrian





On 9/23/15, 8:43 AM, "swetha"  wrote:

>Hi,
>
>How to make Group By more efficient? Is it recommended to use a custom
>partitioner and then do a Group By? And can we use a custom partitioner and
>then use a  reduceByKey for optimization?
>
>
>Thanks,
>Swetha
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-Group-By-reduceByKey-more-efficient-tp24780.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-24 Thread Adrian Tanase
+1 on grouping the case classes and creating a hierarchy – as long as you use 
the data programatically. For DataFrames / SQL the other ideas probably scale 
better…

From: Ted Yu
Date: Wednesday, September 23, 2015 at 7:07 AM
To: satish chandra j
Cc: user
Subject: Re: Scala Limitation - Case Class definition with more than 22 
arguments

Can you switch to 2.11 ?

The following has been fixed in 2.11:
https://issues.scala-lang.org/browse/SI-7296

Otherwise consider packaging related values into a case class of their own.

On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j 
> wrote:
HI All,
Do we have any alternative solutions in Scala to avoid limitation in defining a 
Case Class having more than 22 arguments

We are using Scala version 2.10.2, currently I need to define a case class with 
37 arguments but getting an error as "error:Implementation 
restriction:caseclasses cannot have more than 22parameters."

It would be a great help if any inputs on the same

Regards,
Satish Chandra





Re: reduceByKeyAndWindow confusion

2015-09-24 Thread Adrian Tanase
Let me take a stab at your questions – can you clarify some of the points 
below? I’m wondering if you’re using the streaming concepts as they were 
intended…

1. Windowed operations

First, I just want to confirm that it is your intention to split the original 
kafka stream into multiple Dstreams – and that grouping by key or 
repartitioning using your Map[Int, List[String]] is not enough.
I have never come across a situation where I needed to do this, so I’m 
wondering if what you need is a simple groupBy / reduceByKey or similar.

Putting that aside, your code below suggests that you’re “windowing” (is that a 
word? :)) the stream twice. You call window on the kafka stream and then you 
“reduce by key and window” the resulting stream.
Again, wondering if your intention is not simply to “reduce by key and window”.

Another thing – you’re computing your RDDs based on the slide duration of the 
window, but that is not correct. The number of RDDs is determined by the “batch 
interval”, which is constant across the streaming context (e.g. 10 seconds, 1 
minute, whatever). Both window duration and slide interval need to be multiples 
of this.

2. Checkpointing

First of all, you should call checkpoint on the streaming context 
ssc.checkpoint(checkpointDir) - where checkpoint dir needs to be a folder in 
local mode and HDFS in cluster mode.
This is probably where the error comes from.

Second – kafka is backed by durable storage so you don’t need to checkpoint 
it’s contents as it an always replay events in case of failure. You could do it 
if you go through the same data multiple times, as a performance enhancement, 
but you don’t have to.

Third – the windowed operation NEEDS to checkpoint data, as it’s stateful – all 
the stateful operations call persist internally as you’ve seen, to avoid 
recreating the full state from original events in case of failure. Doing this 
for a window of 1 hour could take way too long.

Last but not least – when you call checkpoint(interval) you can choose to 
checkpoint more often or less often than the default value. See the 
checkpointing 
docs
 for more info:


For stateful transformations that require RDD checkpointing, the default 
interval is a multiple of the batch interval that is at least 10 seconds. It 
can be set by using dstream.checkpoint(checkpointInterval). Typically, a 
checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting 
to try.

Hope this helps,
-adrian

From: srungarapu vamsi
Date: Wednesday, September 23, 2015 at 10:51 PM
To: user
Subject: reduceByKeyAndWindow confusion

I create  a stream from kafka as belows"

val kafkaDStream  = 
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
 kafkaConf, Set(topics))
  
.window(Minutes(WINDOW_DURATION),Minutes(SLIDER_DURATION))

I have a map ("intToStringList") which is a Map[Int,List[String]]
using this map i am filtering the stream and finally converting it into
Map[Int,DStream[KafkaGenericEvent]]]

1.
Now on this map, for each and every value (which is a 
DStream[KafkaGenericEvent])
i am applying reduceByKeyAndWindow operation.
But since we have to give window duration and slider duration even in 
reduceByKeyAndWindow, does that imply that on every window of the given 
DStream, reduceByKeyAndWindow can be applied with a different window duration 
and slider duration ?
i.e Lets say window DStream is created with window duration-> 16 minutes,
slider duration -> 1 Minute, so  i have one RDD for every window
For reduceByKeyAndWindow, if we have window duration as as 4 minutes and slider 
duration as 1 minute, then will i get 4 RDDs since the 
windowDStream_batchDuration / reduceByKeyAndwindow_batchDuration is 4 ?

2.
As suggested in spark doc, i am trying to give checkpointing interval on the 
kafkaDStream created in the block shown above in the following way:
kafkaDStream.checkpoint(Minutes(4))

But when i execute this, i get the error:
"WindowedDStream has been marked for checkpointing but the storage level has 
not been set to enable persisting. Please use DStream.persist() to set the 
storage level to use memory for better checkpointing performance"
But when i went through the implementation of checkpoint function  of 
DStream.scala, i see a call to persist() function.
Then do i really have to persist function in the WindowedDStream ?
Just to give a shot i made a call to persist method on the windowedDStream and 
then made a call to checkpoint(interval) . Even then i am facing the above 
mentioned error.
How do i solve this ?
--
/Vamsi



Re: reduceByKey inside updateStateByKey in Spark Streaming???

2015-09-24 Thread Adrian Tanase
The 2 operations can't be used inside one another.

If you need something like an all time average then you need to keep a tuple 
(sum, count) to which you add all the new values that come in every batch. The 
average is then just a map on the state DStream.

Makes sense? have I guessed your use case?

Sent from my iPhone

> On 24 Sep 2015, at 19:47, swetha  wrote:
> 
> Hi,
> 
> How to use reduceByKey inside updateStateByKey? Suppose I have a bunch of
> keys for which I need to do sum and average inside the  updateStateByKey by
> joining with old state. How do I accomplish that?
> 
> 
> Thanks,
> Swetha
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-inside-updateStateByKey-in-Spark-Streaming-tp24808.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Deploying spark-streaming application on production

2015-09-22 Thread Adrian Tanase
btw I re-read the docs and I want to clarify that reliable receiver + WAL gives 
you at least once, not exactly once semantics.

Sent from my iPhone

On 21 Sep 2015, at 21:50, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:

I'm wondering, isn't this the canonical use case for WAL + reliable receiver?

As far as I know you can tune Mqtt server to wait for ack on messages (qos 
level 2?).
With some support from the client libray you could achieve exactly once 
semantics on the read side, if you ack message only after writing it to WAL, 
correct?

-adrian

Sent from my iPhone

On 21 Sep 2015, at 12:35, Petr Novak 
<oss.mli...@gmail.com<mailto:oss.mli...@gmail.com>> wrote:

In short there is no direct support for it in Spark AFAIK. You will either 
manage it in MQTT or have to add another layer of indirection - either 
in-memory based (observable streams, in-mem db) or disk based (Kafka, hdfs 
files, db) which will keep you unprocessed events.

Now realizing, there is support for backpressure in v1.5.0 but I don't know if 
it could be exploited aka I don't know if it is possible to decouple event 
reading into memory and actual processing code in Spark which could be swapped 
on the fly. Probably not without some custom built facility for it.

Petr

On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak 
<oss.mli...@gmail.com<mailto:oss.mli...@gmail.com>> wrote:
I should read my posts at least once to avoid so many typos. Hopefully you are 
brave enough to read through.

Petr

On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 
<oss.mli...@gmail.com<mailto:oss.mli...@gmail.com>> wrote:
I think you would have to persist events somehow if you don't want to miss 
them. I don't see any other option there. Either in MQTT if it is supported 
there or routing them through Kafka.

There is WriteAheadLog in Spark but you would have decouple stream MQTT reading 
and processing into 2 separate job so that you could upgrade the processing one 
assuming the reading one would be stable (without changes) across versions. But 
it is problematic because there is no easy way how to share DStreams between 
jobs - you would have develop your own facility for it.

Alternatively the reading job could could save MQTT event in its the most raw 
form into files - to limit need to change code - and then the processing job 
would work on top of it using Spark streaming based on files. I this is 
inefficient and can get quite complex if you would like to make it reliable.

Basically either MQTT supports prsistence (which I don't know) or there is 
Kafka for these use case.

Another option would be I think to place observable streams in between MQTT and 
Spark streaming with bakcpressure as far as you could perform upgrade till 
buffers fills up.

I'm sorry that it is not thought out well from my side, it is just a brainstorm 
but it might lead you somewhere.

Regards,
Petr

On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele 
<gangele...@gmail.com<mailto:gangele...@gmail.com>> wrote:
Hi All,

I have an spark streaming application with batch (10 ms) which is reading the 
MQTT channel and dumping the data from MQTT to HDFS.

So suppose if I have to deploy new application jar(with changes in spark 
streaming application) what is the best way to deploy, currently I am doing as 
below

1.killing the running streaming app using yarn application -kill ID
2. and then starting the application again

Problem with above approach is since we are not persisting the events in MQTT 
we will miss the events for the period of deploy.

how to handle this case?

regards
jeeetndra





Re: Spark Streaming distributed job

2015-09-22 Thread Adrian Tanase
I think you need to dig into the custom receiver implementation. As long as the 
source is distributed and partitioned, the downstream .map, .foreachXX are all 
distributed as you would expect.

You could look at how the “classic” Kafka receiver is instantiated in the 
streaming guide and try to start from there:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving




-adrian

On 9/22/15, 1:51 AM, "nib...@free.fr"  wrote:

>Hello, 
>Please could you explain me what is exactly distributed when I launch a spark 
>streaming job over YARN cluster ?
>My code is something like :
>
>JavaDStream customReceiverStream = 
>ssc.receiverStream(streamConfig.getJmsReceiver());
>
>JavaDStream incoming_msg = customReceiverStream.map(
>   new Function()
>   {
>   public String call(JMSEvent jmsEvent)
>   {
>   return jmsEvent.getText();
>   }
>   }
>   );
>
>incoming_msg.foreachRDD( new Function() {
>   public Void call(JavaRDD rdd) throws Exception {
>   rdd.foreachPartition(new VoidFunction() { 
> 
>   @Override
>   public void call(Iterator msg) throws Exception 
> {
>   while (msg.hasNext()) {
>  // insert message in MongoDB
>}
>
>
>So, in this code , at what step is done the distribution over YARN :
>- Does my receiver is distributed (and so all the rest also) ?
>- Does the foreachRDD is distributed (and so all the rest also)?
>- Does foreachPartition is distributed ?
>
>Tks
>Nicolas
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Invalid checkpoint url

2015-09-22 Thread Adrian Tanase
Have you tried simply ssc.checkpoint("checkpoint”)? This should create it in 
the local folder, has always worked for me when in development on local mode.

For the others (/tmp/..) make sure you have rights to write there.

-adrian

From: srungarapu vamsi
Date: Tuesday, September 22, 2015 at 7:59 AM
To: user
Subject: Invalid checkpoint url

I am using reduceByKeyAndWindow (with inverse reduce function) in my code.
In order to use this, it seems the checkpointDirectory which i have to use 
should be hadoop compatible file system.
Does that mean that, i should setup hadoop on my system.
I googled about this and i found in a S.O answer that i need not setup hdfs but 
the checkpoint directory should be HDFS copatible.

I am a beginner in this area. I am running my spark streaming application on 
ubuntu 14.04, spark -1.3.1
If at all i need not setup hdfs and ext4 is hdfs compatible, then how does my 
checkpoint directory look like?

i tried all these:
ssc.checkpoint("/tmp/checkpoint")
ssc.checkpoint("hdfs:///tmp/checkpoint")
ssc.checkpoint("file:///tmp/checkpoint")

But none of them worked for me.

--
/Vamsi



Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Adrian Tanase
just give zipWithIndex a shot, use it early in the pipeline. I think it 
provides exactly the info you need, as the index is the original line number in 
the file, not the index in the partition.

Sent from my iPhone

On 22 Sep 2015, at 17:50, Philip Weaver 
<philip.wea...@gmail.com<mailto:philip.wea...@gmail.com>> wrote:

Thanks. If textFile can be used in a way that preserves order, than both the 
partition index and the index within each partition should be consistent, right?

I overcomplicated the question by asking about removing duplicates. 
Fundamentally I think my question is, how does one sort lines in a file by line 
number.

On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
By looking through the docs and source code, I think you can get away with 
rdd.zipWithIndex to get the index of each line in the file, as long as you 
define the parallelism upfront:
sc.textFile("README.md", 4)

You can then just do .groupBy(...).mapValues(_.sortBy(...).head) - I'm skimming 
through some tuples, hopefully this is clear enough.

-adrian

From: Philip Weaver
Date: Tuesday, September 22, 2015 at 3:26 AM
To: user
Subject: Remove duplicate keys by always choosing first in file.

I am processing a single file and want to remove duplicate rows by some key by 
always choosing the first row in the file for that key.

The best solution I could come up with is to zip each row with the partition 
index and local index, like this:

rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
  rows.zipWithIndex.map { case (row, localIndex) => (row.key, ((partitionIndex, 
localIndex), row)) }
}

And then using reduceByKey with a min ordering on the (partitionIndex, 
localIndex) pair.

First, can i count on SparkContext.textFile to read the lines in such that the 
partition indexes are always increasing so that the above works?

And, is there a better way to accomplish the same effect?

Thanks!

- Philip




Re: Using Spark for portfolio manager app

2015-09-21 Thread Adrian Tanase
  1.  reading from kafka has exactly once guarantees - we are using it in 
production today (with the direct receiver)
 *   ​you will probably have 2 topics, loading both into spark and joining 
/ unioning as needed is not an issue
 *   tons of optimizations you can do there, assuming everything else works
  2.  ​for ad-hoc query I would say you absolutely need to look at external 
storage
 *   ​querying the Dstream or spark's RDD's directly should be done mostly 
for aggregates/metrics, not by users
 *   if you look at HBase or Cassandra for storage then 50k writes /sec are 
not a problem at all, especially combined with a smart client that does batch 
puts (like async hbase)
 *   you could also consider writing the updates to another kafka topic and 
have  a different component that updates the DB, if you think of other 
optimisations there
  3.  ​by stats I assume you mean metrics (operational or business)
 *   ​there are multiple ways to do this, however I would not encourage you 
to query spark directly, especially if you need an archive/history of your 
datapoints
 *   we are using OpenTSDB (we already have a HBase cluster) + Grafana for 
dashboarding
 *   collecting the metrics is a bit hairy in a streaming app - we have 
experimented with both accumulators and RDDs specific for metrics - chose the 
RDDs that write to OpenTSDB using foreachRdd

​-adrian


From: Thúy Hằng Lê 
Sent: Sunday, September 20, 2015 7:26 AM
To: Jörn Franke
Cc: user@spark.apache.org
Subject: Re: Using Spark for portfolio manager app

Thanks Adrian and Jorn for the answers.

Yes, you're right there are lot of things I need to consider if I want to use 
Spark for my app.

I still have few concerns/questions from your information:

1/ I need to combine trading stream with tick stream, I am planning to use 
Kafka for that
If I am using approach #2 (Direct Approach) in this tutorial 
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
[https://spark.apache.org/docs/latest/img/spark-logo-hd.png]

Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ...
Spark Streaming + Kafka Integration Guide. Apache Kafka is publish-subscribe 
messaging rethought as a distributed, partitioned, replicated commit log 
service.
Read 
more...


Will I receive exactly one semantics? Or I have to add some logic in my code to 
archive that.
As your suggestion of using delta update, exactly one semantic is required for 
this application.

2/ For ad-hoc query, I must output of Spark to external storage and query on 
that right?
Is there any way to do ah-hoc query on Spark? my application could have 50k 
updates per second at pick time.
Persistent to external storage lead to high latency in my app.

3/ How to get real-time statistics from Spark,
In  most of the Spark streaming examples, the statistics are echo to the stdout.
However, I want to display those statics on GUI, is there any way to retrieve 
data from Spark directly without using external Storage?


2015-09-19 16:23 GMT+07:00 Jörn Franke 
>:

If you want to be able to let your users query their portfolio then you may 
want to think about storing the current state of the portfolios in 
hbase/phoenix or alternatively a cluster of relationaldatabases can make sense. 
For the rest you may use Spark.

Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê 
> a écrit :
Hi all,

I am going to build a financial application for Portfolio Manager, where each 
portfolio contains a list of stocks, the number of shares purchased, and the 
purchase price.
Another source of information is stocks price from market data. The application 
need to calculate real-time gain or lost of each stock in each portfolio ( 
compared to the purchase price).

I am new with Spark, i know using Spark Streaming I can aggregate portfolio 
possitions in real-time, for example:
user A contains:
  - 100 IBM stock with transactionValue=$15000
  - 500 AAPL stock with transactionValue=$11400

Now given the stock prices change in real-time too, e.g if IBM price at 151, i 
want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - 15000 = $100

My questions are:

 * What is the best method to combine 2 real-time streams( transaction 
made by user and market pricing data) in Spark.
 * How can I use real-time Adhoc SQL again portfolio's positions, is 
there any way i can do SQL on the output of Spark Streamming.
 For example,
  select sum(gainOrLost) from portfolio where user='A';
 * What are prefered external storages for Spark in this use case.
 * Is 

Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Adrian Tanase
We do - using Spark streaming, Kafka, HDFS all collocated on the same nodes. 
Works great so far.


Spark picks up the location information and reads data from the partitions 
hosted by the local broker, showing up as NODE_LOCAL in the UI.

You also need to look at the locality options in the config 
(spark.locality.waitand friends) - just to make sure you're not wasting time if 
the kafka cluster becomes unbalanced and there are fewer cores than partitions 
on a particular node - you want to get to RACK_LOCAL as quickly as possible, 
we've set this to 500 milis instead of the default of 3 seconds.

-adrian


From: Cody Koeninger 
Sent: Monday, September 21, 2015 10:19 PM
To: Ashish Soni
Cc: user
Subject: Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

The direct stream already uses the kafka leader for a given partition as the 
preferred location.

I don't run kafka on the same nodes as spark, and I don't know anyone who does, 
so that situation isn't particularly well tested.

On Mon, Sep 21, 2015 at 1:15 PM, Ashish Soni 
> wrote:
Hi All ,

Just wanted to find out if there is an benefits to installing  kafka brokers 
and spark nodes on the same machine ?

is it possible that spark can pull data from kafka if it is local to the node 
i.e. the broker or partition is on the same machine.

Thanks,
Ashish



Re: What's the best practice to parse JSON using spark

2015-09-21 Thread Adrian Tanase
I've been using spray-json for general 
JSON ser/deser in scala (spark app), mostly for config files and data exchange. 
Haven't used it in conjunction with jobs that process large JSON data sources, 
so can't speak for those use cases.


-adrian



From: Petr Novak 
Sent: Monday, September 21, 2015 12:11 PM
To: Cui Lin; user
Subject: Re: What's the best practice to parse JSON using spark

Surprisingly I had the same issue when including json4s dependency at the same 
version v3.2.10. I had to remove json4s deps from my code. I'm using Scala 
2.11, there might be some issue with mixing 2.10/2.11 and it could be just my 
environment. I haven't investigated much as depending on Spark provided version 
is fine for us for now.

Regards,
Petr

On Mon, Sep 21, 2015 at 11:06 AM, Petr Novak 
> wrote:
Internally Spark is using json4s and jackson parser v3.2.10, AFAIK. So if you 
are using Scala they should be available without adding dependencies. There is 
v3.2.11 already available but adding to my app was causing NoSuchMethod 
exception so I would have to shade it. I'm simply staying on v3.2.10 for now.

Regards,
Petr

On Sat, Sep 19, 2015 at 2:45 AM, Ted Yu 
> wrote:
For #1, see this thread: http://search-hadoop.com/m/q3RTti0Thneenne2

For #2, also see:
examples//src/main/python/hbase_inputformat.py
examples//src/main/python/hbase_outputformat.py

Cheers

On Fri, Sep 18, 2015 at 5:12 PM, Ted Yu 
> wrote:
For #2, please see:

examples/src/main/scala//org/apache/spark/examples/HBaseTest.scala
examples/src/main/scala//org/apache/spark/examples/pythonconverters/HBaseConverters.scala

In hbase, there is hbase-spark module which is being polished. Should be 
available in hbase 1.3.0 release.

Cheers

On Fri, Sep 18, 2015 at 5:09 PM, Cui Lin 
> wrote:
Hello,All,

Parsing JSON's nested structure is easy if using Java or Python API. Where I 
can find the similar way to parse JSON file using spark?

Another question is by using SparkSQL, how can i easily save the results into 
NOSQL DB? any examples? Thanks a lot!



--
Best regards!

Lin,Cui






Re: Deploying spark-streaming application on production

2015-09-21 Thread Adrian Tanase
I'm wondering, isn't this the canonical use case for WAL + reliable receiver?

As far as I know you can tune Mqtt server to wait for ack on messages (qos 
level 2?).
With some support from the client libray you could achieve exactly once 
semantics on the read side, if you ack message only after writing it to WAL, 
correct?

-adrian

Sent from my iPhone

On 21 Sep 2015, at 12:35, Petr Novak 
> wrote:

In short there is no direct support for it in Spark AFAIK. You will either 
manage it in MQTT or have to add another layer of indirection - either 
in-memory based (observable streams, in-mem db) or disk based (Kafka, hdfs 
files, db) which will keep you unprocessed events.

Now realizing, there is support for backpressure in v1.5.0 but I don't know if 
it could be exploited aka I don't know if it is possible to decouple event 
reading into memory and actual processing code in Spark which could be swapped 
on the fly. Probably not without some custom built facility for it.

Petr

On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak 
> wrote:
I should read my posts at least once to avoid so many typos. Hopefully you are 
brave enough to read through.

Petr

On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 
> wrote:
I think you would have to persist events somehow if you don't want to miss 
them. I don't see any other option there. Either in MQTT if it is supported 
there or routing them through Kafka.

There is WriteAheadLog in Spark but you would have decouple stream MQTT reading 
and processing into 2 separate job so that you could upgrade the processing one 
assuming the reading one would be stable (without changes) across versions. But 
it is problematic because there is no easy way how to share DStreams between 
jobs - you would have develop your own facility for it.

Alternatively the reading job could could save MQTT event in its the most raw 
form into files - to limit need to change code - and then the processing job 
would work on top of it using Spark streaming based on files. I this is 
inefficient and can get quite complex if you would like to make it reliable.

Basically either MQTT supports prsistence (which I don't know) or there is 
Kafka for these use case.

Another option would be I think to place observable streams in between MQTT and 
Spark streaming with bakcpressure as far as you could perform upgrade till 
buffers fills up.

I'm sorry that it is not thought out well from my side, it is just a brainstorm 
but it might lead you somewhere.

Regards,
Petr

On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele 
> wrote:
Hi All,

I have an spark streaming application with batch (10 ms) which is reading the 
MQTT channel and dumping the data from MQTT to HDFS.

So suppose if I have to deploy new application jar(with changes in spark 
streaming application) what is the best way to deploy, currently I am doing as 
below

1.killing the running streaming app using yarn application -kill ID
2. and then starting the application again

Problem with above approach is since we are not persisting the events in MQTT 
we will miss the events for the period of deploy.

how to handle this case?

regards
jeeetndra





Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Adrian Tanase
Reading through the docs it seems that with a combination of FAIR scheduler and 
maybe pools you can get pretty far.

However the smallest unit of scheduled work is the task so probably you need to 
think about the parallelism of each transformation.

I'm guessing that by increasing the level of parallelism you get many smaller 
tasks that the scheduler can then run across the many jobs you might have - as 
opposed to fewer, longer tasks...

Lastly, 8 cores is not that much horsepower :)
You may consider running with beefier machines or a larger cluster, to get at 
least tens of cores.

Hope this helps,
-adrian

Sent from my iPhone

On 18 Sep 2015, at 18:37, Philip Weaver 
> wrote:

Here's a specific example of what I want to do. My Spark application is running 
with total-executor-cores=8. A request comes in, it spawns a thread to handle 
that request, and starts a job. That job should use only 4 cores, not all 8 of 
the cores available to the cluster.. When the first job is scheduled, it should 
take only 4 cores, not all 8 of the cores that are available to the driver.

Is there any way to accomplish this? This is on mesos.

In order to support the use cases described in 
https://spark.apache.org/docs/latest/job-scheduling.html, where a spark 
application runs for a long time and handles requests from multiple users, I 
believe what I'm asking about is a very important feature. One of the goals is 
to get lower latency for each request, but if the first request takes all 
resources and we can't guarantee any free resources for the second request, 
then that defeats the purpose. Does that make sense?

Thanks in advance for any advice you can provide!

- Philip

On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
> wrote:
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR 
scheduler, so I can define a long-running application capable of executing 
multiple simultaneous spark jobs.

The kind of jobs that I'm running do not benefit from more than 4 cores, but I 
want my application to be able to take several times that in order to run 
multiple jobs at the same time.

I suppose my question is more basic: How can I limit the number of cores used 
to load an RDD or DataFrame? I can immediately repartition or coalesce my RDD 
or DataFrame to 4 partitions after I load it, but that doesn't stop Spark from 
using more cores to load it.

Does it make sense what I am trying to accomplish, and is there any way to do 
it?

- Philip




Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Adrian Tanase
Forgot to mention that you could also restrict the parallelism to 4, 
essentially using only 4 cores at any given time, however if your job is 
complex, a stage might be broken into more than 1 task...

Sent from my iPhone

On 19 Sep 2015, at 08:30, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:

Reading through the docs it seems that with a combination of FAIR scheduler and 
maybe pools you can get pretty far.

However the smallest unit of scheduled work is the task so probably you need to 
think about the parallelism of each transformation.

I'm guessing that by increasing the level of parallelism you get many smaller 
tasks that the scheduler can then run across the many jobs you might have - as 
opposed to fewer, longer tasks...

Lastly, 8 cores is not that much horsepower :)
You may consider running with beefier machines or a larger cluster, to get at 
least tens of cores.

Hope this helps,
-adrian

Sent from my iPhone

On 18 Sep 2015, at 18:37, Philip Weaver 
<philip.wea...@gmail.com<mailto:philip.wea...@gmail.com>> wrote:

Here's a specific example of what I want to do. My Spark application is running 
with total-executor-cores=8. A request comes in, it spawns a thread to handle 
that request, and starts a job. That job should use only 4 cores, not all 8 of 
the cores available to the cluster.. When the first job is scheduled, it should 
take only 4 cores, not all 8 of the cores that are available to the driver.

Is there any way to accomplish this? This is on mesos.

In order to support the use cases described in 
https://spark.apache.org/docs/latest/job-scheduling.html, where a spark 
application runs for a long time and handles requests from multiple users, I 
believe what I'm asking about is a very important feature. One of the goals is 
to get lower latency for each request, but if the first request takes all 
resources and we can't guarantee any free resources for the second request, 
then that defeats the purpose. Does that make sense?

Thanks in advance for any advice you can provide!

- Philip

On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
<philip.wea...@gmail.com<mailto:philip.wea...@gmail.com>> wrote:
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR 
scheduler, so I can define a long-running application capable of executing 
multiple simultaneous spark jobs.

The kind of jobs that I'm running do not benefit from more than 4 cores, but I 
want my application to be able to take several times that in order to run 
multiple jobs at the same time.

I suppose my question is more basic: How can I limit the number of cores used 
to load an RDD or DataFrame? I can immediately repartition or coalesce my RDD 
or DataFrame to 4 partitions after I load it, but that doesn't stop Spark from 
using more cores to load it.

Does it make sense what I am trying to accomplish, and is there any way to do 
it?

- Philip




Re: Using Spark for portfolio manager app

2015-09-18 Thread Adrian Tanase
Cool use case! You should definitely be able to model it with Spark.

For the first question it's pretty easy - you probably need to keep the user 
portfolios as state using updateStateByKey.
You need to consume 2 event sources - user trades and stock changes. You 
probably want to Cogroup the stock changes with users that have that stock in 
their portfolio, then union the 2 message streams.

As messages come in, you consume the union of these 2 streams and you update 
the state. Messages modeled as case classes and a pattern match should do the 
trick (assuming scala). After the update, you need to emit a tuple with 
(newPortfolio, gainOrLost) so you can also push the deltas somewhere.

For the Sql part, you need to create a Dataframe out of the user portfolio 
DStream, using foreachrdd. Look around for examples of Sql + spark streaming, I 
think databricks had a sample app / tutorial. 
You can then query the resulting DataFrame using SQL.
If instead of one update you want to provide a graph then you need to use a 
window over the gainOrLose.

That being said, there are a lot of interesting questions you'll need to answer 
about state keeping, event sourcing, persistance, durability - especially 
around outputting data out of spark, where you need to do more work to achieve 
exactly once semmantics. I only focused on the main dataflow.

Hope this helps, that's how I'd model it, anyway :)

-adrian

Sent from my iPhone

> On 19 Sep 2015, at 05:43, Thúy Hằng Lê  wrote:
> 
> Hi all,
> 
> I am going to build a financial application for Portfolio Manager, where each 
> portfolio contains a list of stocks, the number of shares purchased, and the 
> purchase price. 
> Another source of information is stocks price from market data. The 
> application need to calculate real-time gain or lost of each stock in each 
> portfolio ( compared to the purchase price).
> 
> I am new with Spark, i know using Spark Streaming I can aggregate portfolio 
> possitions in real-time, for example:
> user A contains: 
>   - 100 IBM stock with transactionValue=$15000
>   - 500 AAPL stock with transactionValue=$11400
> 
> Now given the stock prices change in real-time too, e.g if IBM price at 151, 
> i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - 15000 = 
> $100
> 
> My questions are:
> 
>  * What is the best method to combine 2 real-time streams( 
> transaction made by user and market pricing data) in Spark.
>  * How can I use real-time Adhoc SQL again portfolio's positions, is 
> there any way i can do SQL on the output of Spark Streamming.
>  For example,
>   select sum(gainOrLost) from portfolio where user='A';
>  * What are prefered external storages for Spark in this use case.
>  * Is spark is right choice for my use case?
>  


Re: Spark on YARN / aws - executor lost on node restart

2015-09-18 Thread Adrian Tanase
Hi guys,

Digging up this question after spending some more time trying to replicate it.
It seems to be an issue with the YARN – spark integration, wondering if there 
is a bug already tracking this?

If I just kill the process on the machine, YARN detects the container is dead 
and the spark framework requests a new container to be deployed.
If the machine goes away completely, spark sees that the executor is lost but 
YarnAllocator never tries to request the container again. Wondering if there’s 
an implicit assumption that it would be notified by YARN, which might not 
happen if the node dies completely?

If there are no ideas on the list, I’ll prepare some logs and follow up with an 
issue.

Thanks,
-adrian

From: Adrian Tanase
Date: Wednesday, September 16, 2015 at 6:01 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Spark on YARN / aws - executor lost on node restart

Hi all,

We’re using spark streaming (1.4.0), deployed on AWS through yarn. It’s a 
stateful app that reads from kafka (with the new direct API) and we’re 
checkpointing to HDFS.

During some resilience testing, we restarted one of the machines and brought it 
back online. During the offline period, the Yarn cluster would not have 
resources to re-create the missing executor.
After starting all the services on the machine, it correctly joined the Yarn 
cluster, however the spark streaming app does not seem to notice that the 
resources are back and has not re-created the missing executor.

The app is correctly running with 6 out o 7 executors, however it’s running 
under capacity.
If we manually kill the driver and re-submit the app to yarn, all the sate is 
correctly recreated from checkpoint and all 7 executors are now online – 
however this seems like a brutal workaround.

So, here are some questions:

  *   Isn't the driver supposed to auto-heal after a machine is completely lost 
and then comes back after some time?
  *   Are any configuration settings that influence how spark driver should 
poll yarn to check back on resources being available again?
  *   Is there a tool one can run to “force” the driver to re-create missing 
workers/executors?

Lastly, another issue was that the driver also crashed and yarn successfully 
restarted it – I’m not sure yet if it’s because of some retry setting or 
another exception, will post the logs after I recreate the problem.

Thanks in advance for any ideas,
-adrian


Re: Spark Streaming application code change and stateful transformations

2015-09-17 Thread Adrian Tanase
This section in the streaming guide also outlines a new option – use 2 versions 
in parallel for a period of time, controlling the draining / transition in the 
application level.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code

Also – I would not dismiss the graceful shutdown approach, since you’re 
controlling the shutdown.
At a minimum, you can monitor if it was successful and if it failed, you simply 
restart the app, relying on checkpoint recovery before trying again…

I’m copy-pasting more details from an answer I posted earlier to a similar 
question:

  1.  Use 2 versions in parallel, drain the queue up to a point and strat fresh 
in the new version, only processing events from that point forward
 *   Note that “up to a point” is specific to you state management logic, 
it might mean “user sessions stated after 4 am” NOT “events received after 4 am”
  2.  Graceful shutdown and saving data to DB, followed by checkpoint cleanup / 
new checkpoint dir
 *   On restat, you need to use the updateStateByKey that takes an 
initialRdd with the values preloaded from DB
 *   By cleaning the checkpoint in between upgrades, data is loaded only 
once

Hope this helps,
-adrian

From: Ofir Kerker
Date: Wednesday, September 16, 2015 at 6:12 PM
To: Cody Koeninger
Cc: "user@spark.apache.org"
Subject: Re: Spark Streaming application code change and stateful 
transformations

Thanks Cody!
The 2nd solution is safer but seems wasteful :/
I'll try to optimize it by keeping in addition to the 'last-complete-hour' the 
corresponding offsets that bound the incomplete data to try and fast-forward 
only the last couple of hours in the worst case.

On Mon, Sep 14, 2015 at 22:14 Cody Koeninger 
> wrote:
Solution 2 sounds better to me.  You aren't always going to have graceful 
shutdowns.

On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker 
> wrote:
Hi,
My Spark Streaming application consumes messages (events) from Kafka every
10 seconds using the direct stream approach and aggregates these messages
into hourly aggregations (to answer analytics questions like: "How many
users from Paris visited page X between 8PM to 9PM") and save the data to
Cassandra.

I was wondering if there's a good practice for handling a code change in a
Spark Streaming applications that uses stateful transformations
(updateStateByKey for example) because the new application code will not be
able to use the data that was checkpointed by the former application.
I have thought of a few solutions for this issue and was hoping some of you
have some experience with such case and can suggest other solutions or
feedback my suggested solutions:
*Solution #1*: On a graceful shutdown, in addition to the current Kafka
offsets, persist the current aggregated data into Cassandra tables
(different than the regular aggregation tables) that would allow reading
them easily when the new application starts in order to build the initial
state.
*Solution #2*: When an hour is "complete" (i.e not expecting more events
with the timestamp of this hour), update somewhere persistent (DB / shared
file) the last-complete-hour. This will allow me, when the new application
starts, to read all the events from Kafka from the beginning of retention
period (last X hours) and ignore events from timestamp smaller or equal than
the last-complete-hour.

I'll be happy to get your feedback!

Thanks,
Ofir




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Saprk.frame.Akkasize

2015-09-17 Thread Adrian Tanase
Have you reviewed this section of the guide?
http://spark.apache.org/docs/latest/programming-guide.html#shared-variables

If the dataset is static and you need a copy on all the nodes, you should look 
at broadcast variables.

SQL specific, have you tried loading the dataset using the DataFrame API 
directly? It seems to me like you’re using the 2 files as “metadata” instead of 
data…

-adrian

From: Angel Angel
Date: Thursday, September 17, 2015 at 12:28 PM
To: "user@spark.apache.org"
Subject: Saprk.frame.Akkasize

Hi,

I am running some deep learning algorithm on spark.

Example:
https://github.com/deeplearning4j/dl4j-spark-ml-examples


i am trying to run this example in local mode and its working fine.
but when i try to run this example in cluster mode i got following error.

Loaded Mnist dataframe:
15/09/17 18:20:33 WARN TaskSetManager: Stage 0 contains a task of very large 
size (46279 KB). The maximum recommended task size is 100 KB.
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Serialized task 0:0 was 47622358 bytes, which exceeds max 
allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). 
Consider increasing spark.akka.frameSize or using broadcast variables for large 
values.
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)

Also i have attached the snapshot of error.

And my java driver program is


  public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("spark://hadoopm0:7077").
.setAppName("Mnist Classification Pipeline (Java)");
SparkContext jsc = new SparkContext(conf);
   SQLContext jsql = new SQLContext(jsc);

//String imagesPath = 
"hdfs://hadoopm0:8020/tmp/input1/images-idx1-ubyte";
//String labelsPath = 
"hdfs://hadoopm0:8020/tmp/input1/labels-idx1-ubyte";
String imagesPath = "file:///root/Downloads/Database/images-idx1-ubyte";
String labelsPath = "file:///root/Downloads/Database/labels-idx1-ubyte";
Map params = new HashMap();
params.put("imagesPath", imagesPath);
params.put("labelsPath", labelsPath);
DataFrame data = jsql.read().format(DefaultSource.class.getName())
.options(params).load();

System.out.println("\nLoaded Mnist dataframe:");
data.show(100);






Please give me some reference or suggestions to solve this problem.

Thanks in advance



  1   2   >